commit
658248e3b1
4
Makefile
4
Makefile
|
@ -35,12 +35,12 @@ EUNIT_OPTS = verbose
|
||||||
# CT_SUITES = emqx_frame
|
# CT_SUITES = emqx_frame
|
||||||
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
|
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
|
||||||
|
|
||||||
CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_connection emqx_session \
|
CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
|
||||||
emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
|
emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
|
||||||
emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
|
emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
|
||||||
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
|
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
|
||||||
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
|
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
|
||||||
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub
|
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge emqx_hooks
|
||||||
|
|
||||||
CT_NODE_NAME = emqxct@127.0.0.1
|
CT_NODE_NAME = emqxct@127.0.0.1
|
||||||
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
|
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
|
||||||
|
|
|
@ -1149,10 +1149,10 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
|
||||||
## Value: on | off
|
## Value: on | off
|
||||||
## listener.ssl.external.honor_cipher_order = on
|
## listener.ssl.external.honor_cipher_order = on
|
||||||
|
|
||||||
## Use the CN field from the client certificate as a username.
|
## Use the CN, EN or CRT field from the client certificate as a username.
|
||||||
## Notice that 'verify' should be set as 'verify_peer'.
|
## Notice that 'verify' should be set as 'verify_peer'.
|
||||||
##
|
##
|
||||||
## Value: cn | en
|
## Value: cn | en | crt
|
||||||
## listener.ssl.external.peer_cert_as_username = cn
|
## listener.ssl.external.peer_cert_as_username = cn
|
||||||
|
|
||||||
## TCP backlog for the SSL connection.
|
## TCP backlog for the SSL connection.
|
||||||
|
@ -1512,7 +1512,7 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
|
||||||
|
|
||||||
## See: listener.ssl.$name.peer_cert_as_username
|
## See: listener.ssl.$name.peer_cert_as_username
|
||||||
##
|
##
|
||||||
## Value: cn | dn
|
## Value: cn | dn | crt
|
||||||
## listener.wss.external.peer_cert_as_username = cn
|
## listener.wss.external.peer_cert_as_username = cn
|
||||||
|
|
||||||
## TCP backlog for the WebSocket/SSL connection.
|
## TCP backlog for the WebSocket/SSL connection.
|
||||||
|
|
|
@ -922,7 +922,7 @@ end}.
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [
|
{mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [
|
||||||
{datatype, {enum, [cn, dn]}}
|
{datatype, {enum, [cn, dn, crt]}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.tcp.$name.backlog", "emqx.listeners", [
|
{mapping, "listener.tcp.$name.backlog", "emqx.listeners", [
|
||||||
|
@ -1112,7 +1112,7 @@ end}.
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.ssl.$name.peer_cert_as_username", "emqx.listeners", [
|
{mapping, "listener.ssl.$name.peer_cert_as_username", "emqx.listeners", [
|
||||||
{datatype, {enum, [cn, dn]}}
|
{datatype, {enum, [cn, dn, crt]}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -1373,7 +1373,7 @@ end}.
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.wss.$name.peer_cert_as_username", "emqx.listeners", [
|
{mapping, "listener.wss.$name.peer_cert_as_username", "emqx.listeners", [
|
||||||
{datatype, {enum, [cn, dn]}}
|
{datatype, {enum, [cn, dn, crt]}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{translation, "emqx.listeners", fun(Conf) ->
|
{translation, "emqx.listeners", fun(Conf) ->
|
||||||
|
|
|
@ -102,8 +102,13 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
ensure_expiry_timer(State) ->
|
||||||
|
State#{expiry_timer := emqx_misc:start_timer(timer:seconds(2), expire)}.
|
||||||
|
-else.
|
||||||
ensure_expiry_timer(State) ->
|
ensure_expiry_timer(State) ->
|
||||||
State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}.
|
State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}.
|
||||||
|
-endif.
|
||||||
|
|
||||||
expire_banned_items(Now) ->
|
expire_banned_items(Now) ->
|
||||||
mnesia:foldl(fun
|
mnesia:foldl(fun
|
||||||
|
|
|
@ -260,9 +260,19 @@ subscription(Topic, Subscriber) ->
|
||||||
|
|
||||||
-spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()).
|
-spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()).
|
||||||
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
|
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
|
||||||
length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) >= 1;
|
case ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1) of
|
||||||
|
{Match, _} ->
|
||||||
|
length(Match) >= 1;
|
||||||
|
'$end_of_table' ->
|
||||||
|
false
|
||||||
|
end;
|
||||||
subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
|
subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
|
||||||
length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) >= 1;
|
case ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1) of
|
||||||
|
{Match, _} ->
|
||||||
|
length(Match) >= 1;
|
||||||
|
'$end_of_table' ->
|
||||||
|
false
|
||||||
|
end;
|
||||||
subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
|
subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
|
||||||
ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}).
|
ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}).
|
||||||
|
|
||||||
|
|
|
@ -75,4 +75,3 @@ compile(Rules) ->
|
||||||
{ok, MP} = re:compile(Re),
|
{ok, MP} = re:compile(Re),
|
||||||
{rewrite, Topic, MP, Dest}
|
{rewrite, Topic, MP, Dest}
|
||||||
end, Rules).
|
end, Rules).
|
||||||
|
|
||||||
|
|
|
@ -33,35 +33,34 @@
|
||||||
-export([shutdown/2]).
|
-export([shutdown/2]).
|
||||||
|
|
||||||
-record(pstate, {
|
-record(pstate, {
|
||||||
zone,
|
zone,
|
||||||
sendfun,
|
sendfun,
|
||||||
peername,
|
peername,
|
||||||
peercert,
|
peercert,
|
||||||
proto_ver,
|
proto_ver,
|
||||||
proto_name,
|
proto_name,
|
||||||
ackprops,
|
client_id,
|
||||||
client_id,
|
is_assigned,
|
||||||
is_assigned,
|
conn_pid,
|
||||||
conn_pid,
|
conn_props,
|
||||||
conn_props,
|
ack_props,
|
||||||
ack_props,
|
username,
|
||||||
username,
|
session,
|
||||||
session,
|
clean_start,
|
||||||
clean_start,
|
topic_aliases,
|
||||||
topic_aliases,
|
packet_size,
|
||||||
packet_size,
|
will_topic,
|
||||||
will_topic,
|
will_msg,
|
||||||
will_msg,
|
keepalive,
|
||||||
keepalive,
|
mountpoint,
|
||||||
mountpoint,
|
is_super,
|
||||||
is_super,
|
is_bridge,
|
||||||
is_bridge,
|
enable_ban,
|
||||||
enable_ban,
|
enable_acl,
|
||||||
enable_acl,
|
recv_stats,
|
||||||
recv_stats,
|
send_stats,
|
||||||
send_stats,
|
connected,
|
||||||
connected,
|
connected_at
|
||||||
connected_at
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(state() :: #pstate{}).
|
-type(state() :: #pstate{}).
|
||||||
|
@ -71,6 +70,8 @@
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
-define(NO_PROPS, undefined).
|
||||||
|
|
||||||
-define(LOG(Level, Format, Args, PState),
|
-define(LOG(Level, Format, Args, PState),
|
||||||
emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format,
|
emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format,
|
||||||
[PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])).
|
[PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])).
|
||||||
|
@ -106,9 +107,10 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
|
||||||
|
|
||||||
init_username(Peercert, Options) ->
|
init_username(Peercert, Options) ->
|
||||||
case proplists:get_value(peer_cert_as_username, Options) of
|
case proplists:get_value(peer_cert_as_username, Options) of
|
||||||
cn -> esockd_peercert:common_name(Peercert);
|
cn -> esockd_peercert:common_name(Peercert);
|
||||||
dn -> esockd_peercert:subject(Peercert);
|
dn -> esockd_peercert:subject(Peercert);
|
||||||
_ -> undefined
|
crt -> Peercert;
|
||||||
|
_ -> undefined
|
||||||
end.
|
end.
|
||||||
|
|
||||||
set_username(Username, PState = #pstate{username = undefined}) ->
|
set_username(Username, PState = #pstate{username = undefined}) ->
|
||||||
|
@ -142,7 +144,6 @@ attrs(#pstate{zone = Zone,
|
||||||
proto_ver = ProtoVer,
|
proto_ver = ProtoVer,
|
||||||
proto_name = ProtoName,
|
proto_name = ProtoName,
|
||||||
keepalive = Keepalive,
|
keepalive = Keepalive,
|
||||||
will_topic = WillTopic,
|
|
||||||
mountpoint = Mountpoint,
|
mountpoint = Mountpoint,
|
||||||
is_super = IsSuper,
|
is_super = IsSuper,
|
||||||
is_bridge = IsBridge,
|
is_bridge = IsBridge,
|
||||||
|
@ -156,7 +157,6 @@ attrs(#pstate{zone = Zone,
|
||||||
{proto_name, ProtoName},
|
{proto_name, ProtoName},
|
||||||
{clean_start, CleanStart},
|
{clean_start, CleanStart},
|
||||||
{keepalive, Keepalive},
|
{keepalive, Keepalive},
|
||||||
{will_topic, WillTopic},
|
|
||||||
{mountpoint, Mountpoint},
|
{mountpoint, Mountpoint},
|
||||||
{is_super, IsSuper},
|
{is_super, IsSuper},
|
||||||
{is_bridge, IsBridge},
|
{is_bridge, IsBridge},
|
||||||
|
@ -284,14 +284,13 @@ process_packet(?CONNECT_PACKET(
|
||||||
clean_start = CleanStart,
|
clean_start = CleanStart,
|
||||||
keepalive = Keepalive,
|
keepalive = Keepalive,
|
||||||
properties = ConnProps,
|
properties = ConnProps,
|
||||||
will_topic = WillTopic,
|
|
||||||
client_id = ClientId,
|
client_id = ClientId,
|
||||||
username = Username,
|
username = Username,
|
||||||
password = Password} = Connect), PState) ->
|
password = Password} = Connect), PState) ->
|
||||||
|
|
||||||
%% TODO: Mountpoint...
|
%% TODO: Mountpoint...
|
||||||
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
|
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
|
||||||
WillMsg = emqx_packet:will_msg(Connect),
|
WillMsg = make_will_msg(Connect),
|
||||||
|
|
||||||
PState1 = set_username(Username,
|
PState1 = set_username(Username,
|
||||||
PState#pstate{client_id = ClientId,
|
PState#pstate{client_id = ClientId,
|
||||||
|
@ -300,7 +299,6 @@ process_packet(?CONNECT_PACKET(
|
||||||
clean_start = CleanStart,
|
clean_start = CleanStart,
|
||||||
keepalive = Keepalive,
|
keepalive = Keepalive,
|
||||||
conn_props = ConnProps,
|
conn_props = ConnProps,
|
||||||
will_topic = WillTopic,
|
|
||||||
will_msg = WillMsg,
|
will_msg = WillMsg,
|
||||||
is_bridge = IsBridge,
|
is_bridge = IsBridge,
|
||||||
connected_at = os:timestamp()}),
|
connected_at = os:timestamp()}),
|
||||||
|
@ -525,7 +523,6 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
|
||||||
'Wildcard-Subscription-Available' => flag(Wildcard),
|
'Wildcard-Subscription-Available' => flag(Wildcard),
|
||||||
'Subscription-Identifier-Available' => 1,
|
'Subscription-Identifier-Available' => 1,
|
||||||
'Response-Information' => ResponseInformation,
|
'Response-Information' => ResponseInformation,
|
||||||
|
|
||||||
'Shared-Subscription-Available' => flag(Shared)},
|
'Shared-Subscription-Available' => flag(Shared)},
|
||||||
|
|
||||||
Props1 = if
|
Props1 = if
|
||||||
|
@ -603,10 +600,10 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Assign a clientid
|
%% Assign a clientid
|
||||||
|
|
||||||
maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) ->
|
maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) ->
|
||||||
ClientId = emqx_guid:to_base62(emqx_guid:gen()),
|
ClientId = emqx_guid:to_base62(emqx_guid:gen()),
|
||||||
AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
|
AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
|
||||||
PState#pstate{client_id = ClientId, is_assigned = true, ackprops = AckProps1};
|
PState#pstate{client_id = ClientId, is_assigned = true, ack_props = AckProps1};
|
||||||
maybe_assign_client_id(PState) ->
|
maybe_assign_client_id(PState) ->
|
||||||
PState.
|
PState.
|
||||||
|
|
||||||
|
@ -614,14 +611,16 @@ try_open_session(PState = #pstate{zone = Zone,
|
||||||
client_id = ClientId,
|
client_id = ClientId,
|
||||||
conn_pid = ConnPid,
|
conn_pid = ConnPid,
|
||||||
username = Username,
|
username = Username,
|
||||||
clean_start = CleanStart}) ->
|
clean_start = CleanStart,
|
||||||
|
will_msg = WillMsg}) ->
|
||||||
|
|
||||||
SessAttrs = #{
|
SessAttrs = #{
|
||||||
zone => Zone,
|
zone => Zone,
|
||||||
client_id => ClientId,
|
client_id => ClientId,
|
||||||
conn_pid => ConnPid,
|
conn_pid => ConnPid,
|
||||||
username => Username,
|
username => Username,
|
||||||
clean_start => CleanStart
|
clean_start => CleanStart,
|
||||||
|
will_msg => WillMsg
|
||||||
},
|
},
|
||||||
|
|
||||||
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]),
|
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]),
|
||||||
|
@ -634,14 +633,14 @@ try_open_session(PState = #pstate{zone = Zone,
|
||||||
set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
|
set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
|
||||||
maps:put(max_inflight, if
|
maps:put(max_inflight, if
|
||||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||||
maps:get('Receive-Maximum', ConnProps, 65535);
|
get_property('Receive-Maximum', ConnProps, 65535);
|
||||||
true ->
|
true ->
|
||||||
emqx_zone:get_env(Zone, max_inflight, 65535)
|
emqx_zone:get_env(Zone, max_inflight, 65535)
|
||||||
end, SessAttrs);
|
end, SessAttrs);
|
||||||
set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) ->
|
set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) ->
|
||||||
maps:put(expiry_interval, if
|
maps:put(expiry_interval, if
|
||||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||||
maps:get('Session-Expiry-Interval', ConnProps, 0);
|
get_property('Session-Expiry-Interval', ConnProps, 0);
|
||||||
true ->
|
true ->
|
||||||
case CleanStart of
|
case CleanStart of
|
||||||
true -> 0;
|
true -> 0;
|
||||||
|
@ -652,7 +651,7 @@ set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, c
|
||||||
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
|
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
|
||||||
maps:put(topic_alias_maximum, if
|
maps:put(topic_alias_maximum, if
|
||||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||||
maps:get('Topic-Alias-Maximum', ConnProps, 0);
|
get_property('Topic-Alias-Maximum', ConnProps, 0);
|
||||||
true ->
|
true ->
|
||||||
emqx_zone:get_env(Zone, max_topic_alias, 0)
|
emqx_zone:get_env(Zone, max_topic_alias, 0)
|
||||||
end, SessAttrs);
|
end, SessAttrs);
|
||||||
|
@ -671,11 +670,26 @@ authenticate(Credentials, Password) ->
|
||||||
{error, Error}
|
{error, Error}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
set_property(Name, Value, undefined) ->
|
set_property(Name, Value, ?NO_PROPS) ->
|
||||||
#{Name => Value};
|
#{Name => Value};
|
||||||
set_property(Name, Value, Props) ->
|
set_property(Name, Value, Props) ->
|
||||||
Props#{Name => Value}.
|
Props#{Name => Value}.
|
||||||
|
|
||||||
|
get_property(_Name, undefined, Default) ->
|
||||||
|
Default;
|
||||||
|
get_property(Name, Props, Default) ->
|
||||||
|
maps:get(Name, Props, Default).
|
||||||
|
|
||||||
|
make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer,
|
||||||
|
will_props = WillProps} = Connect) ->
|
||||||
|
emqx_packet:will_msg(if
|
||||||
|
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||||
|
WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0),
|
||||||
|
Connect#mqtt_packet_connect{will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)};
|
||||||
|
true ->
|
||||||
|
Connect
|
||||||
|
end).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Check Packet
|
%% Check Packet
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -814,23 +828,11 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
|
||||||
Reason =:= discard ->
|
Reason =:= discard ->
|
||||||
emqx_cm:unregister_connection(ClientId);
|
emqx_cm:unregister_connection(ClientId);
|
||||||
shutdown(Reason, PState = #pstate{connected = true,
|
shutdown(Reason, PState = #pstate{connected = true,
|
||||||
client_id = ClientId,
|
client_id = ClientId}) ->
|
||||||
will_msg = WillMsg}) ->
|
|
||||||
?LOG(info, "Shutdown for ~p", [Reason], PState),
|
?LOG(info, "Shutdown for ~p", [Reason], PState),
|
||||||
_ = send_willmsg(WillMsg),
|
|
||||||
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
|
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
|
||||||
emqx_cm:unregister_connection(ClientId).
|
emqx_cm:unregister_connection(ClientId).
|
||||||
|
|
||||||
send_willmsg(undefined) ->
|
|
||||||
ignore;
|
|
||||||
send_willmsg(WillMsg = #message{topic = Topic,
|
|
||||||
headers = #{'Will-Delay-Interval' := Interval}})
|
|
||||||
when is_integer(Interval), Interval > 0 ->
|
|
||||||
SendAfter = integer_to_binary(Interval),
|
|
||||||
emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>});
|
|
||||||
send_willmsg(WillMsg) ->
|
|
||||||
emqx_broker:publish(WillMsg).
|
|
||||||
|
|
||||||
start_keepalive(0, _PState) ->
|
start_keepalive(0, _PState) ->
|
||||||
ignore;
|
ignore;
|
||||||
start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 ->
|
start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 ->
|
||||||
|
|
|
@ -47,7 +47,7 @@
|
||||||
-export([info/1, attrs/1]).
|
-export([info/1, attrs/1]).
|
||||||
-export([stats/1]).
|
-export([stats/1]).
|
||||||
-export([resume/2, discard/2]).
|
-export([resume/2, discard/2]).
|
||||||
-export([update_expiry_interval/2, update_misc/2]).
|
-export([update_expiry_interval/2]).
|
||||||
-export([subscribe/2, subscribe/4]).
|
-export([subscribe/2, subscribe/4]).
|
||||||
-export([publish/3]).
|
-export([publish/3]).
|
||||||
-export([puback/2, puback/3]).
|
-export([puback/2, puback/3]).
|
||||||
|
@ -147,7 +147,11 @@
|
||||||
%% Created at
|
%% Created at
|
||||||
created_at :: erlang:timestamp(),
|
created_at :: erlang:timestamp(),
|
||||||
|
|
||||||
topic_alias_maximum :: pos_integer()
|
topic_alias_maximum :: pos_integer(),
|
||||||
|
|
||||||
|
will_msg :: emqx:message(),
|
||||||
|
|
||||||
|
will_delay_timer :: reference() | undefined
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(spid() :: pid()).
|
-type(spid() :: pid()).
|
||||||
|
@ -307,9 +311,9 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
|
||||||
UnsubReq = {PacketId, Properties, TopicFilters},
|
UnsubReq = {PacketId, Properties, TopicFilters},
|
||||||
gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
|
gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
|
||||||
|
|
||||||
-spec(resume(spid(), pid()) -> ok).
|
-spec(resume(spid(), map()) -> ok).
|
||||||
resume(SPid, ConnPid) ->
|
resume(SPid, SessAttrs) ->
|
||||||
gen_server:cast(SPid, {resume, ConnPid}).
|
gen_server:cast(SPid, {resume, SessAttrs}).
|
||||||
|
|
||||||
%% @doc Discard the session
|
%% @doc Discard the session
|
||||||
-spec(discard(spid(), ByPid :: pid()) -> ok).
|
-spec(discard(spid(), ByPid :: pid()) -> ok).
|
||||||
|
@ -320,9 +324,6 @@ discard(SPid, ByPid) ->
|
||||||
update_expiry_interval(SPid, Interval) ->
|
update_expiry_interval(SPid, Interval) ->
|
||||||
gen_server:cast(SPid, {expiry_interval, Interval}).
|
gen_server:cast(SPid, {expiry_interval, Interval}).
|
||||||
|
|
||||||
update_misc(SPid, Misc) ->
|
|
||||||
gen_server:cast(SPid, {update_misc, Misc}).
|
|
||||||
|
|
||||||
-spec(close(spid()) -> ok).
|
-spec(close(spid()) -> ok).
|
||||||
close(SPid) ->
|
close(SPid) ->
|
||||||
gen_server:call(SPid, close, infinity).
|
gen_server:call(SPid, close, infinity).
|
||||||
|
@ -338,7 +339,8 @@ init([Parent, #{zone := Zone,
|
||||||
clean_start := CleanStart,
|
clean_start := CleanStart,
|
||||||
expiry_interval := ExpiryInterval,
|
expiry_interval := ExpiryInterval,
|
||||||
max_inflight := MaxInflight,
|
max_inflight := MaxInflight,
|
||||||
topic_alias_maximum := TopicAliasMaximum}]) ->
|
topic_alias_maximum := TopicAliasMaximum,
|
||||||
|
will_msg := WillMsg}]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
true = link(ConnPid),
|
true = link(ConnPid),
|
||||||
IdleTimout = get_env(Zone, idle_timeout, 30000),
|
IdleTimout = get_env(Zone, idle_timeout, 30000),
|
||||||
|
@ -362,7 +364,8 @@ init([Parent, #{zone := Zone,
|
||||||
deliver_stats = 0,
|
deliver_stats = 0,
|
||||||
enqueue_stats = 0,
|
enqueue_stats = 0,
|
||||||
created_at = os:timestamp(),
|
created_at = os:timestamp(),
|
||||||
topic_alias_maximum = TopicAliasMaximum
|
topic_alias_maximum = TopicAliasMaximum,
|
||||||
|
will_msg = WillMsg
|
||||||
},
|
},
|
||||||
emqx_sm:register_session(ClientId, attrs(State)),
|
emqx_sm:register_session(ClientId, attrs(State)),
|
||||||
emqx_sm:set_session_stats(ClientId, stats(State)),
|
emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||||
|
@ -511,17 +514,22 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% RESUME:
|
%% RESUME:
|
||||||
handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
|
handle_cast({resume, #{conn_pid := ConnPid,
|
||||||
conn_pid = OldConnPid,
|
will_msg := WillMsg,
|
||||||
clean_start = CleanStart,
|
expiry_interval := SessionExpiryInterval,
|
||||||
retry_timer = RetryTimer,
|
max_inflight := MaxInflight,
|
||||||
await_rel_timer = AwaitTimer,
|
topic_alias_maximum := TopicAliasMaximum}}, State = #state{client_id = ClientId,
|
||||||
expiry_timer = ExpireTimer}) ->
|
conn_pid = OldConnPid,
|
||||||
|
clean_start = CleanStart,
|
||||||
|
retry_timer = RetryTimer,
|
||||||
|
await_rel_timer = AwaitTimer,
|
||||||
|
expiry_timer = ExpireTimer,
|
||||||
|
will_delay_timer = WillDelayTimer}) ->
|
||||||
|
|
||||||
?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
|
?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
|
||||||
|
|
||||||
%% Cancel Timers
|
%% Cancel Timers
|
||||||
lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]),
|
lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
|
||||||
|
|
||||||
case kick(ClientId, OldConnPid, ConnPid) of
|
case kick(ClientId, OldConnPid, ConnPid) of
|
||||||
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
|
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
|
||||||
|
@ -530,14 +538,19 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
|
||||||
|
|
||||||
true = link(ConnPid),
|
true = link(ConnPid),
|
||||||
|
|
||||||
State1 = State#state{conn_pid = ConnPid,
|
State1 = State#state{conn_pid = ConnPid,
|
||||||
binding = binding(ConnPid),
|
binding = binding(ConnPid),
|
||||||
old_conn_pid = OldConnPid,
|
old_conn_pid = OldConnPid,
|
||||||
clean_start = false,
|
clean_start = false,
|
||||||
retry_timer = undefined,
|
retry_timer = undefined,
|
||||||
awaiting_rel = #{},
|
awaiting_rel = #{},
|
||||||
await_rel_timer = undefined,
|
await_rel_timer = undefined,
|
||||||
expiry_timer = undefined},
|
expiry_timer = undefined,
|
||||||
|
expiry_interval = SessionExpiryInterval,
|
||||||
|
inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight),
|
||||||
|
topic_alias_maximum = TopicAliasMaximum,
|
||||||
|
will_delay_timer = undefined,
|
||||||
|
will_msg = WillMsg},
|
||||||
|
|
||||||
%% Clean Session: true -> false???
|
%% Clean Session: true -> false???
|
||||||
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
|
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
|
||||||
|
@ -550,10 +563,6 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
|
||||||
handle_cast({expiry_interval, Interval}, State) ->
|
handle_cast({expiry_interval, Interval}, State) ->
|
||||||
{noreply, State#state{expiry_interval = Interval}};
|
{noreply, State#state{expiry_interval = Interval}};
|
||||||
|
|
||||||
handle_cast({update_misc, #{max_inflight := MaxInflight, topic_alias_maximum := TopicAliasMaximum}}, State) ->
|
|
||||||
{noreply, State#state{inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight),
|
|
||||||
topic_alias_maximum = TopicAliasMaximum}};
|
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
|
emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -612,11 +621,17 @@ handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
|
||||||
?LOG(info, "expired, shutdown now:(", [], State),
|
?LOG(info, "expired, shutdown now:(", [], State),
|
||||||
shutdown(expired, State);
|
shutdown(expired, State);
|
||||||
|
|
||||||
handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) ->
|
handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) ->
|
||||||
{stop, Reason, State#state{conn_pid = undefined}};
|
send_willmsg(WillMsg),
|
||||||
|
{noreply, State#state{will_msg = undefined}};
|
||||||
|
|
||||||
|
handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) ->
|
||||||
|
send_willmsg(WillMsg),
|
||||||
|
{stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
|
||||||
|
|
||||||
handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
|
handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||||
{noreply, ensure_expire_timer(State#state{conn_pid = undefined})};
|
State1 = ensure_will_delay_timer(State),
|
||||||
|
{noreply, ensure_expire_timer(State1#state{conn_pid = undefined})};
|
||||||
|
|
||||||
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
|
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
|
||||||
%% ignore
|
%% ignore
|
||||||
|
@ -631,8 +646,9 @@ handle_info(Info, State) ->
|
||||||
emqx_logger:error("[Session] unexpected info: ~p", [Info]),
|
emqx_logger:error("[Session] unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(Reason, #state{client_id = ClientId, conn_pid = ConnPid}) ->
|
terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, conn_pid = ConnPid}) ->
|
||||||
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
|
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
|
||||||
|
send_willmsg(WillMsg),
|
||||||
%% Ensure to shutdown the connection
|
%% Ensure to shutdown the connection
|
||||||
if
|
if
|
||||||
ConnPid =/= undefined ->
|
ConnPid =/= undefined ->
|
||||||
|
@ -714,6 +730,14 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
|
||||||
ensure_retry_timer(Interval - max(0, Age), State)
|
ensure_retry_timer(Interval - max(0, Age), State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Send Will Message
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
send_willmsg(undefined) ->
|
||||||
|
ignore;
|
||||||
|
send_willmsg(WillMsg) ->
|
||||||
|
emqx_broker:publish(WillMsg).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Expire Awaiting Rel
|
%% Expire Awaiting Rel
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -899,6 +923,11 @@ ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval >
|
||||||
ensure_expire_timer(State) ->
|
ensure_expire_timer(State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
|
ensure_will_delay_timer(State = #state{will_msg = #message{headers = #{'Will-Delay-Interval' := WillDelayInterval}}}) ->
|
||||||
|
State#state{will_delay_timer = emqx_misc:start_timer(WillDelayInterval * 1000, will_delay)};
|
||||||
|
ensure_will_delay_timer(State) ->
|
||||||
|
State.
|
||||||
|
|
||||||
ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined,
|
ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined,
|
||||||
idle_timeout = IdleTimeout}) ->
|
idle_timeout = IdleTimeout}) ->
|
||||||
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
|
|
||||||
-export([open_session/1, close_session/1]).
|
-export([open_session/1, close_session/1]).
|
||||||
-export([lookup_session/1, lookup_session_pid/1]).
|
-export([lookup_session/1, lookup_session_pid/1]).
|
||||||
-export([resume_session/1, resume_session/2]).
|
-export([resume_session/2]).
|
||||||
-export([discard_session/1, discard_session/2]).
|
-export([discard_session/1, discard_session/2]).
|
||||||
-export([register_session/2, unregister_session/1]).
|
-export([register_session/2, unregister_session/1]).
|
||||||
-export([get_session_attrs/1, set_session_attrs/2]).
|
-export([get_session_attrs/1, set_session_attrs/2]).
|
||||||
|
@ -59,15 +59,13 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
|
||||||
end,
|
end,
|
||||||
emqx_sm_locker:trans(ClientId, CleanStart);
|
emqx_sm_locker:trans(ClientId, CleanStart);
|
||||||
|
|
||||||
open_session(SessAttrs = #{clean_start := false,
|
open_session(SessAttrs = #{clean_start := false,
|
||||||
client_id := ClientId,
|
client_id := ClientId,
|
||||||
conn_pid := ConnPid,
|
max_inflight := MaxInflight,
|
||||||
max_inflight := MaxInflight,
|
topic_alias_maximum := TopicAliasMaximum}) ->
|
||||||
topic_alias_maximum := TopicAliasMaximum}) ->
|
|
||||||
ResumeStart = fun(_) ->
|
ResumeStart = fun(_) ->
|
||||||
case resume_session(ClientId, ConnPid) of
|
case resume_session(ClientId, SessAttrs) of
|
||||||
{ok, SPid} ->
|
{ok, SPid} ->
|
||||||
emqx_session:update_misc(SPid, #{max_inflight => MaxInflight, topic_alias_maximum => TopicAliasMaximum}),
|
|
||||||
{ok, SPid, true};
|
{ok, SPid, true};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
emqx_session_sup:start_session(SessAttrs)
|
emqx_session_sup:start_session(SessAttrs)
|
||||||
|
@ -90,15 +88,12 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
|
||||||
end, lookup_session(ClientId)).
|
end, lookup_session(ClientId)).
|
||||||
|
|
||||||
%% @doc Try to resume a session.
|
%% @doc Try to resume a session.
|
||||||
-spec(resume_session(emqx_types:client_id()) -> {ok, pid()} | {error, term()}).
|
-spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}).
|
||||||
resume_session(ClientId) ->
|
resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) ->
|
||||||
resume_session(ClientId, self()).
|
|
||||||
|
|
||||||
resume_session(ClientId, ConnPid) ->
|
|
||||||
case lookup_session(ClientId) of
|
case lookup_session(ClientId) of
|
||||||
[] -> {error, not_found};
|
[] -> {error, not_found};
|
||||||
[{_ClientId, SPid}] ->
|
[{_ClientId, SPid}] ->
|
||||||
ok = emqx_session:resume(SPid, ConnPid),
|
ok = emqx_session:resume(SPid, SessAttrs),
|
||||||
{ok, SPid};
|
{ok, SPid};
|
||||||
Sessions ->
|
Sessions ->
|
||||||
[{_, SPid}|StaleSessions] = lists:reverse(Sessions),
|
[{_, SPid}|StaleSessions] = lists:reverse(Sessions),
|
||||||
|
@ -106,7 +101,7 @@ resume_session(ClientId, ConnPid) ->
|
||||||
lists:foreach(fun({_, StalePid}) ->
|
lists:foreach(fun({_, StalePid}) ->
|
||||||
catch emqx_session:discard(StalePid, ConnPid)
|
catch emqx_session:discard(StalePid, ConnPid)
|
||||||
end, StaleSessions),
|
end, StaleSessions),
|
||||||
ok = emqx_session:resume(SPid, ConnPid),
|
ok = emqx_session:resume(SPid, SessAttrs),
|
||||||
{ok, SPid}
|
{ok, SPid}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,13 @@
|
||||||
clean_start = false,
|
clean_start = false,
|
||||||
password = <<"public">>})).
|
password = <<"public">>})).
|
||||||
|
|
||||||
|
-define(CLIENT3, ?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
username = <<"admin">>,
|
||||||
|
proto_ver = ?MQTT_PROTO_V5,
|
||||||
|
clean_start = false,
|
||||||
|
password = <<"public">>,
|
||||||
|
will_props = #{'Will-Delay-Interval' => 2}})).
|
||||||
|
|
||||||
-define(SUBCODE, [0]).
|
-define(SUBCODE, [0]).
|
||||||
|
|
||||||
-define(PACKETID, 1).
|
-define(PACKETID, 1).
|
||||||
|
@ -66,6 +73,7 @@ groups() ->
|
||||||
[
|
[
|
||||||
mqtt_connect,
|
mqtt_connect,
|
||||||
mqtt_connect_with_tcp,
|
mqtt_connect_with_tcp,
|
||||||
|
mqtt_connect_with_will_props,
|
||||||
mqtt_connect_with_ssl_oneway,
|
mqtt_connect_with_ssl_oneway,
|
||||||
mqtt_connect_with_ssl_twoway,
|
mqtt_connect_with_ssl_twoway,
|
||||||
mqtt_connect_with_ws
|
mqtt_connect_with_ws
|
||||||
|
@ -109,6 +117,14 @@ mqtt_connect_with_tcp(_) ->
|
||||||
{ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data),
|
{ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data),
|
||||||
emqx_client_sock:close(Sock).
|
emqx_client_sock:close(Sock).
|
||||||
|
|
||||||
|
mqtt_connect_with_will_props(_) ->
|
||||||
|
%% Issue #599
|
||||||
|
%% Empty clientId and clean_session = false
|
||||||
|
{ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000),
|
||||||
|
Packet = raw_send_serialize(?CLIENT3),
|
||||||
|
emqx_client_sock:send(Sock, Packet),
|
||||||
|
emqx_client_sock:close(Sock).
|
||||||
|
|
||||||
mqtt_connect_with_ssl_oneway(_) ->
|
mqtt_connect_with_ssl_oneway(_) ->
|
||||||
emqx:shutdown(),
|
emqx:shutdown(),
|
||||||
emqx_ct_broker_helpers:change_opts(ssl_oneway),
|
emqx_ct_broker_helpers:change_opts(ssl_oneway),
|
||||||
|
|
|
@ -29,13 +29,17 @@ t_banned_all(_) ->
|
||||||
emqx_ct_broker_helpers:run_setup_steps(),
|
emqx_ct_broker_helpers:run_setup_steps(),
|
||||||
emqx_banned:start_link(),
|
emqx_banned:start_link(),
|
||||||
TimeNow = erlang:system_time(second),
|
TimeNow = erlang:system_time(second),
|
||||||
ok = emqx_banned:add(#banned{who = {client_id, <<"TestClient">>},
|
Banned = #banned{who = {client_id, <<"TestClient">>},
|
||||||
reason = <<"test">>,
|
reason = <<"test">>,
|
||||||
by = <<"banned suite">>,
|
by = <<"banned suite">>,
|
||||||
desc = <<"test">>,
|
desc = <<"test">>,
|
||||||
until = TimeNow + 10}),
|
until = TimeNow + 1},
|
||||||
|
ok = emqx_banned:add(Banned),
|
||||||
% here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed
|
% here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed
|
||||||
timer:sleep(100),
|
?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
|
||||||
|
timer:sleep(2500),
|
||||||
|
?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
|
||||||
|
ok = emqx_banned:add(Banned),
|
||||||
?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
|
?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
|
||||||
emqx_banned:del({client_id, <<"TestClient">>}),
|
emqx_banned:del({client_id, <<"TestClient">>}),
|
||||||
?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
|
?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
|
||||||
|
-module(emqx_bridge_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[bridge_test].
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_ct_broker_helpers:run_setup_steps(),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
emqx_ct_broker_helpers:run_teardown_steps().
|
||||||
|
|
||||||
|
bridge_test(_) ->
|
||||||
|
{ok, _Pid} = emqx_bridge:start_link(emqx, []),
|
||||||
|
#{msg := <<"start bridge successfully">>}
|
||||||
|
= emqx_bridge:start_bridge(emqx),
|
||||||
|
test_forwards(),
|
||||||
|
test_subscriptions(0),
|
||||||
|
test_subscriptions(1),
|
||||||
|
test_subscriptions(2),
|
||||||
|
#{msg := <<"stop bridge successfully">>}
|
||||||
|
= emqx_bridge:stop_bridge(emqx),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
test_forwards() ->
|
||||||
|
emqx_bridge:add_forward(emqx, <<"test_forwards">>),
|
||||||
|
[<<"test_forwards">>] = emqx_bridge:show_forwards(emqx),
|
||||||
|
emqx_bridge:del_forward(emqx, <<"test_forwards">>),
|
||||||
|
[] = emqx_bridge:show_forwards(emqx),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
test_subscriptions(QoS) ->
|
||||||
|
emqx_bridge:add_subscription(emqx, <<"test_subscriptions">>, QoS),
|
||||||
|
[{<<"test_subscriptions">>, QoS}] = emqx_bridge:show_subscriptions(emqx),
|
||||||
|
emqx_bridge:del_subscription(emqx, <<"test_subscriptions">>),
|
||||||
|
[] = emqx_bridge:show_subscriptions(emqx),
|
||||||
|
ok.
|
|
@ -60,6 +60,11 @@ subscribe_unsubscribe(_) ->
|
||||||
ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
|
ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
|
||||||
ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }),
|
ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }),
|
||||||
ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }),
|
ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }),
|
||||||
|
true = emqx:subscribed(<<"topic">>, <<"clientId">>),
|
||||||
|
Topics = emqx:topics(),
|
||||||
|
lists:foreach(fun(Topic) ->
|
||||||
|
?assert(lists:member(Topic, Topics))
|
||||||
|
end, Topics),
|
||||||
ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
|
ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
|
||||||
ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>),
|
ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>),
|
||||||
ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>).
|
ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>).
|
||||||
|
@ -72,12 +77,16 @@ publish(_) ->
|
||||||
?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
|
?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
|
||||||
|
|
||||||
pubsub(_) ->
|
pubsub(_) ->
|
||||||
|
true = emqx:is_running(node()),
|
||||||
Self = self(),
|
Self = self(),
|
||||||
Subscriber = {Self, <<"clientId">>},
|
Subscriber = {Self, <<"clientId">>},
|
||||||
ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 1 }),
|
ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 1 }),
|
||||||
#{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
|
#{qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
|
||||||
|
#{qos := 1} = emqx:get_subopts(<<"a/b/c">>, Subscriber),
|
||||||
|
true = emqx:set_subopts(<<"a/b/c">>, Subscriber, #{qos => 0}),
|
||||||
|
#{qos := 0} = emqx:get_subopts(<<"a/b/c">>, Subscriber),
|
||||||
ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }),
|
ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }),
|
||||||
#{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
|
#{qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
|
||||||
%% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
|
%% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
|
||||||
timer:sleep(10),
|
timer:sleep(10),
|
||||||
[{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber),
|
[{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber),
|
||||||
|
|
|
@ -1,47 +0,0 @@
|
||||||
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
|
|
||||||
-module(emqx_connection_SUITE).
|
|
||||||
|
|
||||||
-compile(export_all).
|
|
||||||
-compile(nowarn_export_all).
|
|
||||||
|
|
||||||
-include_lib("common_test/include/ct.hrl").
|
|
||||||
|
|
||||||
all() ->
|
|
||||||
[{group, connection}].
|
|
||||||
|
|
||||||
groups() ->
|
|
||||||
[{connection, [sequence], [t_attrs]}].
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
|
||||||
emqx_ct_broker_helpers:run_setup_steps(),
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
|
||||||
emqx_ct_broker_helpers:run_teardown_steps().
|
|
||||||
|
|
||||||
|
|
||||||
t_attrs(_) ->
|
|
||||||
{ok, C, _} = emqx_client:start_link([{host, "localhost"}, {client_id, <<"simpleClient">>}, {username, <<"plain">>}, {password, <<"plain">>}]),
|
|
||||||
[{<<"simpleClient">>, ConnPid}] = emqx_cm:lookup_connection(<<"simpleClient">>),
|
|
||||||
Attrs = emqx_connection:attrs(ConnPid),
|
|
||||||
<<"simpleClient">> = proplists:get_value(client_id, Attrs),
|
|
||||||
<<"plain">> = proplists:get_value(username, Attrs),
|
|
||||||
emqx_client:disconnect(C).
|
|
||||||
|
|
||||||
%% t_stats() ->
|
|
||||||
%% {ok, C, _ } = emqx_client;
|
|
||||||
%% t_stats() ->
|
|
||||||
|
|
|
@ -46,14 +46,15 @@ init([ClientId]) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
|
handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
|
||||||
Attrs = #{ zone => Zone,
|
Attrs = #{ zone => Zone,
|
||||||
client_id => ClientId,
|
client_id => ClientId,
|
||||||
conn_pid => ClientPid,
|
conn_pid => ClientPid,
|
||||||
clean_start => true,
|
clean_start => true,
|
||||||
username => undefined,
|
username => undefined,
|
||||||
expiry_interval => 0,
|
expiry_interval => 0,
|
||||||
max_inflight => 0,
|
max_inflight => 0,
|
||||||
topic_alias_maximum => 0
|
topic_alias_maximum => 0,
|
||||||
|
will_msg => undefined
|
||||||
},
|
},
|
||||||
{ok, SessPid} = emqx_sm:open_session(Attrs),
|
{ok, SessPid} = emqx_sm:open_session(Attrs),
|
||||||
{reply, {ok, SessPid},
|
{reply, {ok, SessPid},
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
|
||||||
|
-module(emqx_mod_rewrite_tests).
|
||||||
|
|
||||||
|
-include_lib("emqx.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
|
||||||
|
rules() ->
|
||||||
|
Rawrules1 = "x/# ^x/y/(.+)$ z/y/$1",
|
||||||
|
Rawrules2 = "y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2",
|
||||||
|
Rawrules = [Rawrules1, Rawrules2],
|
||||||
|
Rules = lists:map(fun(Rule) ->
|
||||||
|
[Topic, Re, Dest] = string:tokens(Rule, " "),
|
||||||
|
{rewrite,
|
||||||
|
list_to_binary(Topic),
|
||||||
|
list_to_binary(Re),
|
||||||
|
list_to_binary(Dest)}
|
||||||
|
end, Rawrules),
|
||||||
|
lists:map(fun({rewrite, Topic, Re, Dest}) ->
|
||||||
|
{ok, MP} = re:compile(Re),
|
||||||
|
{rewrite, Topic, MP, Dest}
|
||||||
|
end, Rules).
|
||||||
|
|
||||||
|
rewrite_subscribe_test() ->
|
||||||
|
Rules = rules(),
|
||||||
|
io:format("Rules: ~p",[Rules]),
|
||||||
|
?assertEqual({ok, [{<<"test">>, opts}]},
|
||||||
|
emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"test">>, opts}], Rules)),
|
||||||
|
?assertEqual({ok, [{<<"z/y/test">>, opts}]},
|
||||||
|
emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"x/y/test">>, opts}], Rules)),
|
||||||
|
?assertEqual({ok, [{<<"y/z/test_topic">>, opts}]},
|
||||||
|
emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"y/test/z/test_topic">>, opts}], Rules)).
|
||||||
|
|
||||||
|
rewrite_unsubscribe_test() ->
|
||||||
|
Rules = rules(),
|
||||||
|
?assertEqual({ok, [{<<"test">>, opts}]},
|
||||||
|
emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"test">>, opts}], Rules)),
|
||||||
|
?assertEqual({ok, [{<<"z/y/test">>, opts}]},
|
||||||
|
emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"x/y/test">>, opts}], Rules)),
|
||||||
|
?assertEqual({ok, [{<<"y/z/test_topic">>, opts}]},
|
||||||
|
emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"y/test/z/test_topic">>, opts}], Rules)).
|
||||||
|
|
||||||
|
rewrite_publish_test() ->
|
||||||
|
Rules = rules(),
|
||||||
|
?assertMatch({ok, #message{topic = <<"test">>}},
|
||||||
|
emqx_mod_rewrite:rewrite_publish(#message{topic = <<"test">>}, Rules)),
|
||||||
|
?assertMatch({ok, #message{topic = <<"z/y/test">>}},
|
||||||
|
emqx_mod_rewrite:rewrite_publish(#message{topic = <<"x/y/test">>}, Rules)),
|
||||||
|
?assertMatch({ok, #message{topic = <<"y/z/test_topic">>}},
|
||||||
|
emqx_mod_rewrite:rewrite_publish(#message{topic = <<"y/test/z/test_topic">>}, Rules)).
|
|
@ -22,7 +22,7 @@
|
||||||
|
|
||||||
-define(PQ, emqx_pqueue).
|
-define(PQ, emqx_pqueue).
|
||||||
|
|
||||||
all() -> [t_priority_queue_plen, t_priority_queue_out2].
|
all() -> [t_priority_queue_plen, t_priority_queue_out2, t_priority_queues].
|
||||||
|
|
||||||
t_priority_queue_plen(_) ->
|
t_priority_queue_plen(_) ->
|
||||||
Q = ?PQ:new(),
|
Q = ?PQ:new(),
|
||||||
|
@ -67,3 +67,57 @@ t_priority_queue_out2(_) ->
|
||||||
{Val5, Q6} = ?PQ:out(Q5),
|
{Val5, Q6} = ?PQ:out(Q5),
|
||||||
{value, a} = Val5,
|
{value, a} = Val5,
|
||||||
{empty, _Q7} = ?PQ:out(Q6).
|
{empty, _Q7} = ?PQ:out(Q6).
|
||||||
|
|
||||||
|
t_priority_queues(_) ->
|
||||||
|
Q0 = ?PQ:new(),
|
||||||
|
Q1 = ?PQ:new(),
|
||||||
|
PQueue = {pqueue, [{0, Q0}, {1, Q1}]},
|
||||||
|
?assert(?PQ:is_queue(PQueue)),
|
||||||
|
[] = ?PQ:to_list(PQueue),
|
||||||
|
|
||||||
|
PQueue1 = ?PQ:in(a, 0, ?PQ:new()),
|
||||||
|
PQueue2 = ?PQ:in(b, 0, PQueue1),
|
||||||
|
|
||||||
|
PQueue3 = ?PQ:in(c, 1, PQueue2),
|
||||||
|
PQueue4 = ?PQ:in(d, 1, PQueue3),
|
||||||
|
|
||||||
|
4 = ?PQ:len(PQueue4),
|
||||||
|
|
||||||
|
[{1, c}, {1, d}, {0, a}, {0, b}] = ?PQ:to_list(PQueue4),
|
||||||
|
PQueue4 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]),
|
||||||
|
|
||||||
|
empty = ?PQ:highest(?PQ:new()),
|
||||||
|
0 = ?PQ:highest(PQueue1),
|
||||||
|
1 = ?PQ:highest(PQueue4),
|
||||||
|
|
||||||
|
PQueue5 = ?PQ:in(e, infinity, PQueue4),
|
||||||
|
PQueue6 = ?PQ:in(f, 1, PQueue5),
|
||||||
|
|
||||||
|
{{value, e}, PQueue7} = ?PQ:out(PQueue6),
|
||||||
|
{empty, _} = ?PQ:out(0, ?PQ:new()),
|
||||||
|
|
||||||
|
{empty, Q0} = ?PQ:out_p(Q0),
|
||||||
|
|
||||||
|
Q2 = ?PQ:in(a, Q0),
|
||||||
|
Q3 = ?PQ:in(b, Q2),
|
||||||
|
Q4 = ?PQ:in(c, Q3),
|
||||||
|
|
||||||
|
{{value, a, 0}, _Q5} = ?PQ:out_p(Q4),
|
||||||
|
|
||||||
|
{{value,c,1}, PQueue8} = ?PQ:out_p(PQueue7),
|
||||||
|
|
||||||
|
Q4 = ?PQ:join(Q4, ?PQ:new()),
|
||||||
|
Q4 = ?PQ:join(?PQ:new(), Q4),
|
||||||
|
|
||||||
|
{queue, [a], [a], 2} = ?PQ:join(Q2, Q2),
|
||||||
|
|
||||||
|
{pqueue,[{-1,{queue,[f],[d],2}},
|
||||||
|
{0,{queue,[a],[a,b],3}}]} = ?PQ:join(PQueue8, Q2),
|
||||||
|
|
||||||
|
{pqueue,[{-1,{queue,[f],[d],2}},
|
||||||
|
{0,{queue,[b],[a,a],3}}]} = ?PQ:join(Q2, PQueue8),
|
||||||
|
|
||||||
|
{pqueue,[{-1,{queue,[f],[d,f,d],4}},
|
||||||
|
{0,{queue,[b],[a,b,a],4}}]} = ?PQ:join(PQueue8, PQueue8).
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -131,6 +131,125 @@ connect_v5(_) ->
|
||||||
#{'Response-Information' := _RespInfo}), _} =
|
#{'Response-Information' := _RespInfo}), _} =
|
||||||
raw_recv_parse(Data, ?MQTT_PROTO_V5)
|
raw_recv_parse(Data, ?MQTT_PROTO_V5)
|
||||||
end),
|
end),
|
||||||
|
|
||||||
|
% test clean start
|
||||||
|
with_connection(fun(Sock) ->
|
||||||
|
emqx_client_sock:send(Sock,
|
||||||
|
raw_send_serialize(
|
||||||
|
?CONNECT_PACKET(
|
||||||
|
#mqtt_packet_connect{
|
||||||
|
proto_ver = ?MQTT_PROTO_V5,
|
||||||
|
clean_start = true,
|
||||||
|
client_id = <<"myclient">>,
|
||||||
|
properties =
|
||||||
|
#{'Session-Expiry-Interval' => 10}})
|
||||||
|
)),
|
||||||
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||||
|
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||||
|
emqx_client_sock:send(Sock, raw_send_serialize(
|
||||||
|
?DISCONNECT_PACKET(?RC_SUCCESS)
|
||||||
|
))
|
||||||
|
end),
|
||||||
|
|
||||||
|
timer:sleep(1000),
|
||||||
|
|
||||||
|
with_connection(fun(Sock) ->
|
||||||
|
emqx_client_sock:send(Sock,
|
||||||
|
raw_send_serialize(
|
||||||
|
?CONNECT_PACKET(
|
||||||
|
#mqtt_packet_connect{
|
||||||
|
proto_ver = ?MQTT_PROTO_V5,
|
||||||
|
clean_start = false,
|
||||||
|
client_id = <<"myclient">>,
|
||||||
|
properties =
|
||||||
|
#{'Session-Expiry-Interval' => 10}})
|
||||||
|
)),
|
||||||
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||||
|
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
|
||||||
|
end),
|
||||||
|
|
||||||
|
% test will message publish and cancel
|
||||||
|
with_connection(fun(Sock) ->
|
||||||
|
emqx_client_sock:send(Sock,
|
||||||
|
raw_send_serialize(
|
||||||
|
?CONNECT_PACKET(
|
||||||
|
#mqtt_packet_connect{
|
||||||
|
proto_ver = ?MQTT_PROTO_V5,
|
||||||
|
clean_start = true,
|
||||||
|
client_id = <<"myclient">>,
|
||||||
|
will_flag = true,
|
||||||
|
will_qos = ?QOS_1,
|
||||||
|
will_retain = false,
|
||||||
|
will_props = #{'Will-Delay-Interval' => 5},
|
||||||
|
will_topic = <<"TopicA">>,
|
||||||
|
will_payload = <<"will message">>,
|
||||||
|
properties = #{'Session-Expiry-Interval' => 3}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||||
|
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
|
||||||
|
|
||||||
|
{ok, Sock2} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
|
||||||
|
[binary, {packet, raw},
|
||||||
|
{active, false}], 3000),
|
||||||
|
|
||||||
|
do_connect(Sock2, ?MQTT_PROTO_V5),
|
||||||
|
|
||||||
|
emqx_client_sock:send(Sock2, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
|
||||||
|
qos => ?QOS_2,
|
||||||
|
rap => 0,
|
||||||
|
nl => 0,
|
||||||
|
rc => 0}}]),
|
||||||
|
#{version => ?MQTT_PROTO_V5})),
|
||||||
|
|
||||||
|
{ok, SubData} = gen_tcp:recv(Sock2, 0),
|
||||||
|
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
|
||||||
|
|
||||||
|
emqx_client_sock:send(Sock, raw_send_serialize(
|
||||||
|
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
{error, timeout} = gen_tcp:recv(Sock2, 0, 1000),
|
||||||
|
|
||||||
|
% session resumed
|
||||||
|
{ok, Sock3} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
|
||||||
|
[binary, {packet, raw},
|
||||||
|
{active, false}], 3000),
|
||||||
|
|
||||||
|
emqx_client_sock:send(Sock3,
|
||||||
|
raw_send_serialize(
|
||||||
|
?CONNECT_PACKET(
|
||||||
|
#mqtt_packet_connect{
|
||||||
|
proto_ver = ?MQTT_PROTO_V5,
|
||||||
|
clean_start = false,
|
||||||
|
client_id = <<"myclient">>,
|
||||||
|
will_flag = true,
|
||||||
|
will_qos = ?QOS_1,
|
||||||
|
will_retain = false,
|
||||||
|
will_props = #{'Will-Delay-Interval' => 5},
|
||||||
|
will_topic = <<"TopicA">>,
|
||||||
|
will_payload = <<"will message 2">>,
|
||||||
|
properties = #{'Session-Expiry-Interval' => 3}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
{ok, Data3} = gen_tcp:recv(Sock3, 0),
|
||||||
|
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
|
||||||
|
|
||||||
|
emqx_client_sock:send(Sock3, raw_send_serialize(
|
||||||
|
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, WillData} = gen_tcp:recv(Sock2, 0),
|
||||||
|
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5),
|
||||||
|
|
||||||
|
emqx_client_sock:close(Sock2)
|
||||||
|
end),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_connect(Sock, ProtoVer) ->
|
do_connect(Sock, ProtoVer) ->
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
|
||||||
|
-module(emqx_protocol_tests).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
set_property_test() ->
|
||||||
|
?assertEqual(#{test => test_property}, emqx_protocol:set_property(test, test_property, undefined)),
|
||||||
|
TestMap = #{test => test_property},
|
||||||
|
?assertEqual(#{test => test_property, test1 => test_property2},
|
||||||
|
emqx_protocol:set_property(test1, test_property2, TestMap)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
init_username_test() ->
|
||||||
|
?assertEqual(<<"Peercert">>,
|
||||||
|
emqx_protocol:init_username(<<"Peercert">>, [{peer_cert_as_username, crt}])),
|
||||||
|
?assertEqual(undefined,
|
||||||
|
emqx_protocol:init_username(undefined, [{peer_cert_as_username, undefined}])).
|
|
@ -29,7 +29,9 @@ all() ->
|
||||||
groups() ->
|
groups() ->
|
||||||
[{route, [sequence],
|
[{route, [sequence],
|
||||||
[add_del_route,
|
[add_del_route,
|
||||||
match_routes]}].
|
match_routes,
|
||||||
|
has_routes,
|
||||||
|
router_add_del]}].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_ct_broker_helpers:run_setup_steps(),
|
emqx_ct_broker_helpers:run_setup_steps(),
|
||||||
|
@ -81,6 +83,7 @@ match_routes(_) ->
|
||||||
has_routes(_) ->
|
has_routes(_) ->
|
||||||
From = {self(), make_ref()},
|
From = {self(), make_ref()},
|
||||||
?R:add_route(From, <<"devices/+/messages">>, node()),
|
?R:add_route(From, <<"devices/+/messages">>, node()),
|
||||||
|
timer:sleep(200),
|
||||||
?assert(?R:has_routes(<<"devices/+/messages">>)).
|
?assert(?R:has_routes(<<"devices/+/messages">>)).
|
||||||
|
|
||||||
clear_tables() ->
|
clear_tables() ->
|
||||||
|
@ -88,28 +91,33 @@ clear_tables() ->
|
||||||
|
|
||||||
router_add_del(_) ->
|
router_add_del(_) ->
|
||||||
?R:add_route(<<"#">>),
|
?R:add_route(<<"#">>),
|
||||||
?R:add_route(<<"a/b/c">>),
|
?R:add_route(<<"a/b/c">>, node()),
|
||||||
?R:add_route(<<"+/#">>),
|
?R:add_route(<<"+/#">>),
|
||||||
Routes = [R1, R2 | _] = [
|
Routes = [R1, R2 | _] = [
|
||||||
#route{topic = <<"#">>, dest = node()},
|
#route{topic = <<"#">>, dest = node()},
|
||||||
#route{topic = <<"+/#">>, dest = node()},
|
#route{topic = <<"+/#">>, dest = node()},
|
||||||
#route{topic = <<"a/b/c">>, dest = node()}],
|
#route{topic = <<"a/b/c">>, dest = node()}],
|
||||||
|
timer:sleep(500),
|
||||||
?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
|
?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
|
||||||
|
|
||||||
|
?R:print_routes(<<"a/b/c">>),
|
||||||
|
|
||||||
%% Batch Add
|
%% Batch Add
|
||||||
lists:foreach(fun(R) -> ?R:add_route(R) end, Routes),
|
lists:foreach(fun(R) -> ?R:add_route(R) end, Routes),
|
||||||
?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
|
?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
|
||||||
|
|
||||||
%% Del
|
%% Del
|
||||||
?R:del_route(<<"a/b/c">>),
|
?R:del_route(<<"a/b/c">>, node()),
|
||||||
[R1, R2] = lists:sort(?R:match(<<"a/b/c">>)),
|
timer:sleep(500),
|
||||||
|
[R1, R2] = lists:sort(?R:match_routes(<<"a/b/c">>)),
|
||||||
{atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]),
|
{atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]),
|
||||||
|
|
||||||
%% Batch Del
|
%% Batch Del
|
||||||
R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'},
|
R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'},
|
||||||
?R:add_route(R3),
|
?R:add_route(R3),
|
||||||
?R:del_route(R1),
|
?R:del_route(<<"#">>),
|
||||||
?R:del_route(R2),
|
?R:del_route(R2),
|
||||||
?R:del_route(R3),
|
?R:del_route(R3),
|
||||||
[] = lists:sort(?R:match(<<"a/b/c">>)).
|
timer:sleep(500),
|
||||||
|
[] = lists:sort(?R:match_routes(<<"a/b/c">>)).
|
||||||
|
|
||||||
|
|
|
@ -24,8 +24,15 @@ all() -> [t_open_close_session].
|
||||||
t_open_close_session(_) ->
|
t_open_close_session(_) ->
|
||||||
emqx_ct_broker_helpers:run_setup_steps(),
|
emqx_ct_broker_helpers:run_setup_steps(),
|
||||||
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
|
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
|
||||||
Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid,
|
Attrs = #{clean_start => true,
|
||||||
zone => internal, username => <<"zhou">>, expiry_interval => 0, max_inflight => 0, topic_alias_maximum => 0},
|
client_id => <<"client">>,
|
||||||
|
conn_pid => ClientPid,
|
||||||
|
zone => internal,
|
||||||
|
username => <<"zhou">>,
|
||||||
|
expiry_interval => 0,
|
||||||
|
max_inflight => 0,
|
||||||
|
topic_alias_maximum => 0,
|
||||||
|
will_msg => undefined},
|
||||||
{ok, SPid} = emqx_sm:open_session(Attrs),
|
{ok, SPid} = emqx_sm:open_session(Attrs),
|
||||||
[{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
|
[{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
|
||||||
SPid = emqx_sm:lookup_session_pid(<<"client">>),
|
SPid = emqx_sm:lookup_session_pid(<<"client">>),
|
||||||
|
|
Loading…
Reference in New Issue