Merge develop to release3.1 (#2318)

This commit is contained in:
turtleDeng 2019-03-17 00:32:26 +08:00 committed by Shawn
parent b215da34e8
commit 40abc9b4fe
41 changed files with 962 additions and 1027 deletions

View File

@ -5,12 +5,12 @@ PROJECT_DESCRIPTION = EMQ X Broker
DEPS = jsx gproc gen_rpc ekka esockd cowboy replayq
dep_jsx = hex-emqx 2.9.0
dep_gproc = hex-emqx 0.8.0
dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.0
dep_jsx = git-emqx https://github.com/talentdeficit/jsx 2.9.0
dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0
dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1
dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4
dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.3
dep_cowboy = hex-emqx 2.4.0
dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.4.0
dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1
NO_AUTOPATCH = cuttlefish
@ -31,14 +31,14 @@ EUNIT_OPTS = verbose
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \
emqx_vm_mon emqx_alarm_handler
emqx_vm_mon emqx_alarm_handler emqx_rpc
CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

View File

@ -432,12 +432,6 @@ acl_deny_action = ignore
## MQTT Protocol
##--------------------------------------------------------------------
## Response Topic Prefix
##
## Value: String
## Default: emqxrspv1
mqtt.response_topic_prefix = emqxrspv1
## Maximum MQTT packet size allowed.
##
## Value: Bytes
@ -779,16 +773,6 @@ listener.tcp.external.active_n = 100
## Value: String
listener.tcp.external.zone = external
## Mountpoint of the MQTT/TCP Listener. All the topics will be prefixed
## with the mountpoint path if this option is enabled.
##
## Variables in mountpoint path:
## - %c: clientid
## - %u: username
##
## Value: String
## listener.tcp.external.mountpoint = devicebound/
## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'.
##
## Value: rate,burst
@ -918,13 +902,6 @@ listener.tcp.internal.active_n = 1000
## Value: String
listener.tcp.internal.zone = internal
## Mountpoint of the MQTT/TCP Listener.
##
## See: listener.tcp.$name.mountpoint
##
## Value: String
## listener.tcp.internal.mountpoint = internal/
## Rate limit for the internal MQTT/TCP connections.
##
## See: listener.tcp.$name.rate_limit
@ -1030,11 +1007,6 @@ listener.ssl.external.active_n = 100
## Value: String
listener.ssl.external.zone = external
## Mountpoint of the MQTT/SSL Listener.
##
## Value: String
## listener.ssl.external.mountpoint = devicebound/
## The access control rules for the MQTT/SSL listener.
##
## See: listener.tcp.$name.access
@ -1147,6 +1119,12 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
## Value: Ciphers
listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA
## Ciphers for TLS PSK.
## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#listener.ssl.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## SSL parameter renegotiation is a feature that allows a client and a server
## to renegotiate the parameters of the SSL connection on the fly.
## RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation,
@ -1281,13 +1259,6 @@ listener.ws.external.max_conn_rate = 1000
## Value: String
listener.ws.external.zone = external
## Mountpoint of the MQTT/WebSocket Listener.
##
## See: listener.tcp.$name.mountpoint
##
## Value: String
## listener.ws.external.mountpoint = devicebound/
## The access control for the MQTT/WebSocket listener.
##
## See: listener.tcp.$name.access
@ -1427,13 +1398,6 @@ listener.wss.external.max_conn_rate = 1000
## Value: String
listener.wss.external.zone = external
## Mountpoint of the MQTT/WebSocket/SSL Listener.
##
## See: listener.tcp.$name.mountpoint
##
## Value: String
## listener.wss.external.mountpoint = devicebound/
## The access control rules for the MQTT/WebSocket/SSL listener.
##
## See: listener.tcp.$name.access.<no>
@ -1516,7 +1480,13 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
## See: listener.ssl.$name.ciphers
##
## Value: Ciphers
## listener.wss.external.ciphers =
listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA
## Ciphers for TLS PSK.
## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## See: listener.ssl.$name.secure_renegotiate
##
@ -1587,8 +1557,6 @@ listener.wss.external.send_timeout_close = on
## Value: true | false
## listener.wss.external.nodelay = true
listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA
##--------------------------------------------------------------------
## Bridges
##--------------------------------------------------------------------
@ -1669,7 +1637,13 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## SSL Ciphers used by the bridge.
##
## Value: String
## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
#bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## Ciphers for TLS PSK.
## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## Ping interval of a down bridge.
##
@ -1829,6 +1803,12 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## Value: String
## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## Ciphers for TLS PSK.
## Note that 'bridge.*.ciphers' and 'bridge.*.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#bridge.azure.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## Ping interval of a down bridge.
##
## Value: Duration

View File

@ -15,6 +15,7 @@
-ifndef(EMQX_CLIENT_HRL).
-define(EMQX_CLIENT_HRL, true).
-include("emqx_mqtt.hrl").
-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false,
packet_id, topic, props, payload}).
-endif.

View File

@ -563,11 +563,6 @@ end}.
%% MQTT Protocol
%%--------------------------------------------------------------------
%% @doc Response Topic Prefix
{mapping, "mqtt.response_topic_prefix", "emqx.response_topic_prefix",[
{datatype, string}
]}.
%% @doc Max Packet Size Allowed, 1MB by default.
{mapping, "mqtt.max_packet_size", "emqx.max_packet_size", [
{default, "1MB"},
@ -866,15 +861,16 @@ end}.
{force_shutdown_policy, ShutdownPolicy};
("mqueue_priorities", Val) ->
case Val of
"none" -> none; % NO_PRIORITY_TABLE
"none" -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE
_ ->
lists:foldl(fun(T, Acc) ->
MqueuePriorities = lists:foldl(fun(T, Acc) ->
%% NOTE: space in "= " is intended
[{Topic, Prio}] = string:tokens(T, "= "),
[Topic, Prio] = string:tokens(T, "= "),
P = list_to_integer(Prio),
(P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}),
maps:put(iolist_to_binary(Topic), P, Acc)
end, string:tokens(Val, ","))
end, #{}, string:tokens(Val, ",")),
{mqueue_priorities, MqueuePriorities}
end;
("mountpoint", Val) ->
{mountpoint, iolist_to_binary(Val)};
@ -924,10 +920,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.tcp.$name.mountpoint", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.tcp.$name.rate_limit", "emqx.listeners", [
{default, undefined},
{datatype, string}
@ -1024,10 +1016,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.ssl.$name.mountpoint", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.ssl.$name.rate_limit", "emqx.listeners", [
{default, undefined},
{datatype, string}
@ -1098,6 +1086,10 @@ end}.
{datatype, string}
]}.
{mapping, "listener.ssl.$name.psk_ciphers", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.ssl.$name.handshake_timeout", "emqx.listeners", [
{default, "15s"},
{datatype, {duration, ms}}
@ -1174,10 +1166,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.ws.$name.mountpoint", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.ws.$name.rate_limit", "emqx.listeners", [
{default, undefined},
{datatype, string}
@ -1280,10 +1268,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.wss.$name.mountpoint", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.wss.$name.rate_limit", "emqx.listeners", [
{datatype, string}
]}.
@ -1368,6 +1352,10 @@ end}.
{datatype, string}
]}.
{mapping, "listener.wss.$name.psk_ciphers", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.wss.$name.keyfile", "emqx.listeners", [
{datatype, string}
]}.
@ -1423,8 +1411,6 @@ end}.
end
end,
MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end,
Ratelimit = fun(undefined) ->
undefined;
(S) ->
@ -1442,7 +1428,6 @@ end}.
{rate_limit, Ratelimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))},
{proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)},
{proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
{mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))},
{verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
{proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
@ -1460,14 +1445,40 @@ end}.
end,
SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
MapPSKCiphers = fun(PSKCiphers) ->
lists:map(
fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
("PSK-RC4-SHA") -> {psk, rc4_128, sha}
end, PSKCiphers)
end,
SslOpts = fun(Prefix) ->
Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of
Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of
undefined -> undefined;
L -> [list_to_atom(V) || V <- L]
end,
TLSCiphers = cuttlefish:conf_get(Prefix++".ciphers", Conf, undefined),
PSKCiphers = cuttlefish:conf_get(Prefix++".psk_ciphers", Conf, undefined),
Ciphers =
case {TLSCiphers, PSKCiphers} of
{undefined, undefined} ->
cuttlefish:invalid(Prefix++".ciphers or "++Prefix++".psk_ciphers is absent");
{TLSCiphers, undefined} ->
SplitFun(TLSCiphers);
{undefined, PSKCiphers} ->
MapPSKCiphers(SplitFun(PSKCiphers));
{_TLSCiphers, _PSKCiphers} ->
cuttlefish:invalid(Prefix++".ciphers and "++Prefix++".psk_ciphers cannot be configured at the same time")
end,
UserLookupFun =
case PSKCiphers of
undefined -> undefined;
_ -> {fun emqx_psk:lookup/3, <<>>}
end,
Filter([{versions, Versions},
{ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))},
{ciphers, Ciphers},
{user_lookup_fun, UserLookupFun},
{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)},
{dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)},
{keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
@ -1567,6 +1578,10 @@ end}.
{datatype, string}
]}.
{mapping, "bridge.$name.psk_ciphers", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.keepalive", "emqx.bridges", [
{default, "10s"},
{datatype, {duration, ms}}
@ -1622,22 +1637,34 @@ end}.
]}.
{translation, "emqx.bridges", fun(Conf) ->
MapPSKCiphers = fun(PSKCiphers) ->
lists:map(
fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
("PSK-RC4-SHA") -> {psk, rc4_128, sha}
end, PSKCiphers)
end,
Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
IsSsl = fun(cacertfile) -> true;
(certfile) -> true;
(keyfile) -> true;
(ciphers) -> true;
(psk_ciphers) -> true;
(tls_versions) -> true;
(_Opt) -> false
end,
Parse = fun(tls_versions, Vers) ->
{versions, [list_to_atom(S) || S <- Split(Vers)]};
[{versions, [list_to_atom(S) || S <- Split(Vers)]}];
(ciphers, Ciphers) ->
{ciphers, Split(Ciphers)};
[{ciphers, Split(Ciphers)}];
(psk_ciphers, Ciphers) ->
[{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}];
(Opt, Val) ->
{Opt, Val}
[{Opt, Val}]
end,
Merge = fun(forwards, Val, Opts) ->
@ -1645,7 +1672,7 @@ end}.
(Opt, Val, Opts) ->
case IsSsl(Opt) of
true ->
SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])],
SslOpts = Parse(Opt, Val) ++ proplists:get_value(ssl_opts, Opts, []),
lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
false ->
[{Opt, Val}|Opts]
@ -1697,23 +1724,31 @@ end}.
Cfg#{reconnect_delay_ms => Ms};
Tr(max_inflight, Count, Cfg) ->
Cfg#{max_inflight_batches => Count};
Tr(proto_ver, Ver, Cfg) ->
Cfg#{proto_ver =>
case Ver of
mqttv3 -> v3;
mqttv4 -> v4;
mqttv5 -> v5;
_ -> v4
end};
Tr(Key, Value, Cfg) ->
Cfg#{Key => Value}
end,
maps:to_list(
lists:foldl(
C = lists:foldl(
fun({["bridge", Name, Opt], Val}, Acc) ->
%% e.g #{aws => [{OptKey, OptVal}]}
Init = [{list_to_atom(Opt), Val},
{connect_module, ConnMod(Name)},
{subscriptions, Subscriptions(Name)},
{queue, Queue(Name)}
],
C = maps:update_with(list_to_atom(Name),
fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc),
maps:fold(Translate, #{}, C);
{queue, Queue(Name)}],
maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc);
(_, Acc) -> Acc
end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf))))
end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf))),
C1 = maps:map(fun(Bn, Bc) ->
maps:to_list(maps:fold(Translate, #{}, maps:from_list(Bc)))
end, C),
maps:to_list(C1)
end}.
%%--------------------------------------------------------------------
@ -1952,4 +1987,4 @@ end}.
[{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)},
{process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)},
{process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}]
end}.
end}.

View File

@ -1,11 +1,12 @@
{deps, [{jsx, "2.9.0"},
{gproc, "0.8.0"},
{cowboy, "2.4.0"}
{cowboy, "2.4.0"},
{meck, "0.8.13"} %% temp workaround for version check
]}.
%% appended to deps in rebar.config.script
{github_emqx_deps,
[{gen_rpc, "2.3.0"},
[{gen_rpc, "2.3.1"},
{ekka, "v0.5.3"},
{replayq, "v0.1.1"},
{esockd, "v5.4.4"},
@ -28,5 +29,3 @@
{plugins, [coveralls]}.
{profiles, [{test, [{deps, [{meck, "0.8.13"}]}]}]}.

View File

@ -29,7 +29,7 @@
-export([topics/0, subscriptions/1, subscribers/1, subscribed/2]).
%% Hooks API
-export([hook/2, hook/3, hook/4, unhook/2, run_hooks/2, run_hooks/3]).
-export([hook/2, hook/3, hook/4, unhook/2, run_hook/2, run_fold_hook/3]).
%% Shutdown and reboot
-export([shutdown/0, shutdown/1, reboot/0]).
@ -142,13 +142,13 @@ hook(HookPoint, Action, Filter, Priority) ->
unhook(HookPoint, Action) ->
emqx_hooks:del(HookPoint, Action).
-spec(run_hooks(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
run_hooks(HookPoint, Args) ->
-spec(run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
run_hook(HookPoint, Args) ->
emqx_hooks:run(HookPoint, Args).
-spec(run_hooks(emqx_hooks:hookpoint(), list(any()), any()) -> {ok | stop, any()}).
run_hooks(HookPoint, Args, Acc) ->
emqx_hooks:run(HookPoint, Args, Acc).
-spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()).
run_fold_hook(HookPoint, Args, Acc) ->
emqx_hooks:run_fold(HookPoint, Args, Acc).
%%------------------------------------------------------------------------------
%% Shutdown and reboot
@ -159,6 +159,7 @@ shutdown() ->
shutdown(Reason) ->
emqx_logger:error("emqx shutdown for ~s", [Reason]),
emqx_alarm_handler:unload(),
emqx_plugins:unload(),
lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]).

View File

@ -14,198 +14,55 @@
-module(emqx_access_control).
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-export([start_link/0]).
-export([authenticate/2]).
-export([authenticate/1]).
-export([check_acl/3, reload_acl/0]).
-export([register_mod/3, register_mod/4, unregister_mod/2]).
-export([lookup_mods/1]).
-export([stop/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
%%------------------------------------------------------------------------------
%% API
%% APIs
%%------------------------------------------------------------------------------
%% @doc Start access control server.
-spec(start_link() -> {ok, pid()} | {error, term()}).
start_link() ->
start_with(fun register_default_acl/0).
start_with(Fun) ->
case gen_server:start_link({local, ?SERVER}, ?MODULE, [], []) of
{ok, Pid} ->
Fun(), {ok, Pid};
{error, Reason} ->
{error, Reason}
end.
register_default_acl() ->
case emqx_config:get_env(acl_file) of
undefined -> ok;
File -> register_mod(acl, emqx_acl_internal, [File])
end.
-spec(authenticate(emqx_types:credentials(), emqx_types:password())
-> ok | {ok, map()} | {continue, map()} | {error, term()}).
authenticate(Credentials, Password) ->
authenticate(Credentials, Password, lookup_mods(auth)).
authenticate(Credentials, _Password, []) ->
Zone = maps:get(zone, Credentials, undefined),
case emqx_zone:get_env(Zone, allow_anonymous, false) of
true -> ok;
false -> {error, auth_modules_not_found}
end;
authenticate(Credentials, Password, [{Mod, State, _Seq} | Mods]) ->
try Mod:check(Credentials, Password, State) of
ok -> ok;
{ok, IsSuper} when is_boolean(IsSuper) ->
{ok, #{is_superuser => IsSuper}};
{ok, Result} when is_map(Result) ->
{ok, Result};
{continue, Result} when is_map(Result) ->
{continue, Result};
ignore ->
authenticate(Credentials, Password, Mods);
{error, Reason} ->
{error, Reason}
catch
error:Reason:StackTrace ->
?LOG(error, "Authenticate failed. StackTrace: ~p", [StackTrace]),
{error, Reason}
-spec(authenticate(emqx_types:credentials())
-> {ok, emqx_types:credentials()} | {error, term()}).
authenticate(Credentials) ->
case emqx_hooks:run_fold('client.authenticate', [], Credentials#{result => init_result(Credentials)}) of
#{result := success} = NewCredentials ->
{ok, NewCredentials};
NewCredentials ->
{error, maps:get(result, NewCredentials, unknown_error)}
end.
%% @doc Check ACL
-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny).
check_acl(Credentials, PubSub, Topic) when PubSub =:= publish; PubSub =:= subscribe ->
check_acl(Credentials, PubSub, Topic, lookup_mods(acl), emqx_acl_cache:is_enabled()).
check_acl(Credentials, PubSub, Topic, AclMods, false) ->
do_check_acl(Credentials, PubSub, Topic, AclMods);
check_acl(Credentials, PubSub, Topic, AclMods, true) ->
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
not_found ->
AclResult = do_check_acl(Credentials, PubSub, Topic, AclMods),
emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
AclResult;
AclResult ->
AclResult
check_acl(Credentials, PubSub, Topic) ->
case emqx_acl_cache:is_enabled() of
false ->
do_check_acl(Credentials, PubSub, Topic);
true ->
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
not_found ->
AclResult = do_check_acl(Credentials, PubSub, Topic),
emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
AclResult;
AclResult ->
AclResult
end
end.
do_check_acl(#{zone := Zone}, _PubSub, _Topic, []) ->
emqx_zone:get_env(Zone, acl_nomatch, deny);
do_check_acl(Credentials, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
case Mod:check_acl({Credentials, PubSub, Topic}, State) of
allow -> allow;
deny -> deny;
ignore -> do_check_acl(Credentials, PubSub, Topic, AclMods)
do_check_acl(#{zone := Zone} = Credentials, PubSub, Topic) ->
case emqx_hooks:run_fold('client.check_acl', [Credentials, PubSub, Topic],
emqx_zone:get_env(Zone, acl_nomatch, deny)) of
allow -> allow;
_ -> deny
end.
-spec(reload_acl() -> list(ok | {error, term()})).
reload_acl() ->
[Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)].
emqx_mod_acl_internal:reload_acl().
%% @doc Register an Auth/ACL module.
-spec(register_mod(auth | acl, module(), list()) -> ok | {error, term()}).
register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl ->
register_mod(Type, Mod, Opts, 0).
-spec(register_mod(auth | acl, module(), list(), non_neg_integer())
-> ok | {error, term()}).
register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl->
gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}).
%% @doc Unregister an Auth/ACL module.
-spec(unregister_mod(auth | acl, module()) -> ok | {error, not_found | term()}).
unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
gen_server:call(?SERVER, {unregister_mod, Type, Mod}).
%% @doc Lookup all Auth/ACL modules.
-spec(lookup_mods(auth | acl) -> list()).
lookup_mods(Type) ->
case ets:lookup(?TAB, tab_key(Type)) of
[] -> [];
[{_, Mods}] -> Mods
end.
tab_key(auth) -> auth_modules;
tab_key(acl) -> acl_modules.
stop() ->
gen_server:stop(?SERVER, normal, infinity).
%%-----------------------------------------------------------------------------
%% gen_server callbacks
%%-----------------------------------------------------------------------------
init([]) ->
ok = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]),
{ok, #{}}.
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
Mods = lookup_mods(Type),
reply(case lists:keymember(Mod, 1, Mods) of
true -> {error, already_exists};
false ->
try Mod:init(Opts) of
{ok, ModState} ->
NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) ->
Seq1 >= Seq2
end, [{Mod, ModState, Seq} | Mods]),
ets:insert(?TAB, {tab_key(Type), NewMods}),
ok
catch
_:Error ->
emqx_logger:error("[AccessControl] Failed to init ~s: ~p", [Mod, Error]),
{error, Error}
end
end, State);
handle_call({unregister_mod, Type, Mod}, _From, State) ->
Mods = lookup_mods(Type),
reply(case lists:keyfind(Mod, 1, Mods) of
false ->
{error, not_found};
{Mod, _ModState, _Seq} ->
ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), ok
end, State);
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[AccessControl] unexpected request: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[AccessControl] unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
emqx_logger:error("[AccessControl] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
reply(Reply, State) ->
{reply, Reply, State}.
init_result(Credentials) ->
case emqx_zone:get_env(maps:get(zone, Credentials, undefined), allow_anonymous, false) of
true -> success;
false -> not_authorized
end.

View File

@ -16,6 +16,8 @@
-include("emqx.hrl").
-type(acl_result() :: allow | deny).
-type(who() :: all | binary() |
{client, binary()} |
{user, binary()} |
@ -23,10 +25,8 @@
-type(access() :: subscribe | publish | pubsub).
-type(rule() :: {allow, all} |
{allow, who(), access(), list(emqx_topic:topic())} |
{deny, all} |
{deny, who(), access(), list(emqx_topic:topic())}).
-type(rule() :: {acl_result(), all} |
{acl_result(), who(), access(), list(emqx_topic:topic())}).
-export_type([rule/0]).

View File

@ -32,6 +32,7 @@
terminate/2]).
-export([load/0,
unload/0,
get_alarms/0]).
-record(common_alarm, {id, desc}).
@ -68,6 +69,10 @@ mnesia(copy) ->
load() ->
gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {?MODULE, []}).
%% on the way shutting down, give it back to OTP
unload() ->
gen_event:swap_handler(alarm_handler, {?MODULE, swap}, {alarm_handler, []}).
get_alarms() ->
gen_event:call(alarm_handler, ?MODULE, get_alarms).

View File

@ -43,7 +43,7 @@ start(_Type, _Args) ->
emqx_alarm_handler:load(),
emqx_logger_handler:init(),
print_vsn(),
{ok, Sup}.

View File

@ -345,10 +345,10 @@ connected(internal, maybe_send, State) ->
{next_state, connecting, disconnect(NewState)}
end;
connected(info, {disconnected, ConnRef, Reason},
#{conn_ref := ConnRefCurrent, connection := Conn} = State) ->
#{conn_ref := ConnRefCurrent} = State) ->
case ConnRefCurrent =:= ConnRef of
true ->
?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Conn, Reason]),
?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Reason]),
{next_state, connecting,
State#{conn_ref := undefined, connection := undefined}};
false ->

View File

@ -110,7 +110,7 @@ safe_stop(Pid, StopF, Timeout) ->
send(Conn, Batch) ->
send(Conn, Batch, []).
send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest] = Batch, Acc) ->
send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Rest], Acc) ->
case emqx_client:publish(ClientPid, Msg) of
{ok, PktId} when Rest =:= [] ->
%% last one sent
@ -119,9 +119,6 @@ send(#{client_pid := ClientPid, ack_collector := AckCollector} = Conn, [Msg | Re
{ok, Ref};
{ok, PktId} ->
send(Conn, Rest, [PktId | Acc]);
{error, {_PacketId, inflight_full}} ->
timer:sleep(10),
send(Conn, Batch, Acc);
{error, Reason} ->
%% NOTE: There is no partial sucess of a batch and recover from the middle
%% only to retry all messages in one batch

View File

@ -167,13 +167,14 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
-spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
publish(Msg) when is_record(Msg, message) ->
_ = emqx_tracer:trace(publish, Msg),
case emqx_hooks:run('message.publish', [], Msg) of
{ok, Msg1 = #message{topic = Topic}} ->
Headers = Msg#message.headers,
case emqx_hooks:run_fold('message.publish', [], Msg#message{headers = Headers#{allow_publish => true}}) of
#message{headers = #{allow_publish := false}} ->
?WARN("Publishing interrupted: ~s", [emqx_message:format(Msg)]),
[];
#message{topic = Topic} = Msg1 ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
Delivery#delivery.results;
{stop, _} ->
?WARN("Stop publishing: ~s", [emqx_message:format(Msg)]),
[]
Delivery#delivery.results
end.
%% Called internally
@ -443,5 +444,4 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------

View File

@ -17,12 +17,9 @@
-behaviour(gen_statem).
-include("types.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_client.hrl").
-export([start_link/0, start_link/1]).
-export([request/5, request/6, request_async/7, receive_response/3]).
-export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]).
-export([connect/1]).
-export([subscribe/2, subscribe/3, subscribe/4]).
-export([publish/2, publish/3, publish/4, publish/5]).
@ -38,12 +35,10 @@
%% For test cases
-export([pause/1, resume/1]).
-export([initialized/3, waiting_for_connack/3, connected/3]).
-export([initialized/3, waiting_for_connack/3, connected/3, inflight_full/3]).
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0,
request_input/0, response_payload/0, request_handler/0,
corr_data/0, mqtt_msg/0]).
-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, mqtt_msg/0]).
-export_type([host/0, option/0]).
@ -57,24 +52,12 @@
-define(WILL_MSG(QoS, Retain, Topic, Props, Payload),
#mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}).
-define(RESPONSE_TIMEOUT_SECONDS, timer:seconds(5)).
-define(NO_REQ_HANDLER, undefined).
-define(NO_GROUP, <<>>).
-define(NO_CLIENT_ID, <<>>).
-type(host() :: inet:ip_address() | inet:hostname()).
-type(corr_data() :: binary()).
%% NOTE: Message handler is different from request handler.
%% Message handler is a set of callbacks defined to handle MQTT messages as well as
%% the disconnect event.
%% Request handler is a callback to handle received MQTT message as in 'request',
%% and publish another MQTT message back to the defined topic as in 'response'.
%% `owner' and `msg_handler' has no effect when `request_handler' is set.
%% Message handler is a set of callbacks defined to handle MQTT messages
%% as well as the disconnect event.
-define(NO_MSG_HDLR, undefined).
-type(msg_handler() :: #{puback := fun((_) -> any()),
publish := fun((emqx_types:message()) -> any()),
@ -100,7 +83,6 @@
| {keepalive, non_neg_integer()}
| {max_inflight, pos_integer()}
| {retry_interval, timeout()}
| {request_handler, request_handler()}
| {will_topic, iodata()}
| {will_payload, iodata()}
| {will_retain, boolean()}
@ -146,7 +128,6 @@
ack_timer :: reference(),
retry_interval :: pos_integer(),
retry_timer :: reference(),
request_handler :: request_handler(),
session_present :: boolean(),
last_packet_id :: packet_id(),
parse_state :: emqx_frame:state()}).
@ -176,35 +157,10 @@
-type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}).
-type(request_input() :: binary()).
-type(response_payload() :: binary()).
-type(request_handler() :: fun((request_input()) -> response_payload())).
-type(group() :: binary()).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
%% @doc Swap in a new request handler on the fly.
-spec(set_request_handler(client(), request_handler()) -> ok).
set_request_handler(Responser, RequestHandler) ->
gen_statem:call(Responser, {set_request_handler, RequestHandler}).
%% @doc Subscribe to request topic.
-spec(sub_request_topic(client(), qos(), topic()) -> ok).
sub_request_topic(Client, QoS, Topic) ->
sub_request_topic(Client, QoS, Topic, ?NO_GROUP).
%% @doc Share-subscribe to request topic.
-spec(sub_request_topic(client(), qos(), topic(), group()) -> ok).
sub_request_topic(Client, QoS, Topic, Group) ->
Properties = get_properties(Client),
NewTopic = make_req_rsp_topic(Properties, Topic, Group),
subscribe_req_rsp_topic(Client, QoS, NewTopic).
-spec(start_link() -> gen_statem:start_ret()).
start_link() -> start_link([]).
@ -293,76 +249,6 @@ parse_subopt([{nl, false} | Opts], Result) ->
parse_subopt([{qos, QoS} | Opts], Result) ->
parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}).
-spec(request(client(), topic(), topic(), payload(), qos() | [pubopt()])
-> ok | {ok, packet_id()} | {error, term()}).
request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), is_atom(QoS) ->
request(Client, ResponseTopic, RequestTopic, Payload, [{qos, ?QOS_I(QoS)}]);
request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), ?IS_QOS(QoS) ->
request(Client, ResponseTopic, RequestTopic, Payload, [{qos, QoS}]);
request(Client, ResponseTopic, RequestTopic, Payload, Opts) when is_binary(ResponseTopic), is_list(Opts) ->
request(Client, ResponseTopic, RequestTopic, Payload, Opts, _Properties = #{}).
%% @doc Send a request to request topic and wait for response.
-spec(request(client(), topic(), topic(), payload(), [pubopt()], properties())
-> {ok, response_payload()} | {error, term()}).
request(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties) ->
CorrData = make_corr_data(),
case request_async(Client, ResponseTopic, RequestTopic,
Payload, Opts, Properties, CorrData) of
ok -> receive_response(Client, CorrData, Opts);
{error, Reason} -> {error, Reason}
end.
%% @doc Get client properties.
-spec(get_properties(client()) -> properties()).
get_properties(Client) -> gen_statem:call(Client, get_properties, infinity).
%% @doc Send a request, but do not wait for response.
%% The caller should expect a `{publish, Response}' message,
%% or call `receive_response/3' to receive the message.
-spec(request_async(client(), topic(), topic(), payload(),
[pubopt()], properties(), corr_data()) -> ok | {error, any()}).
request_async(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties, CorrData)
when is_binary(ResponseTopic),
is_binary(RequestTopic),
is_map(Properties),
is_list(Opts) ->
ok = emqx_mqtt_props:validate(Properties),
Retain = proplists:get_bool(retain, Opts),
QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)),
ClientProperties = get_properties(Client),
NewResponseTopic = make_req_rsp_topic(ClientProperties, ResponseTopic),
NewRequestTopic = make_req_rsp_topic(ClientProperties, RequestTopic),
%% This is perhaps not optimal to subscribe the response topic for
%% each and every request even though the response topic is always the same
ok = sub_response_topic(Client, QoS, NewResponseTopic),
NewProperties = maps:merge(Properties, #{'Response-Topic' => NewResponseTopic,
'Correlation-Data' => CorrData}),
case publish(Client, #mqtt_msg{qos = QoS,
retain = Retain,
topic = NewRequestTopic,
props = NewProperties,
payload = iolist_to_binary(Payload)}) of
ok -> ok;
{ok, _PacketId} -> ok; %% assume auto_ack
{error, Reason} -> {error, Reason}
end.
%% @doc Block wait the response for a request sent earlier.
-spec(receive_response(client(), corr_data(), [pubopt()])
-> {ok, response_payload()} | {error, any()}).
receive_response(Client, CorrData, Opts) ->
TimeOut = proplists:get_value(timeout, Opts, ?RESPONSE_TIMEOUT_SECONDS),
MRef = erlang:monitor(process, Client),
TRef = erlang:start_timer(TimeOut, self(), response),
try
receive_response(Client, CorrData, TRef, MRef)
after
erlang:cancel_timer(TRef),
receive {timeout, TRef, _} -> ok after 0 -> ok end,
erlang:demonitor(MRef, [flush])
end.
-spec(publish(client(), topic(), payload()) -> ok | {error, term()}).
publish(Client, Topic, Payload) when is_binary(Topic) ->
publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}).
@ -511,7 +397,6 @@ init([Options]) ->
auto_ack = true,
ack_timeout = ?DEFAULT_ACK_TIMEOUT,
retry_interval = 0,
request_handler = ?NO_REQ_HANDLER,
connect_timeout = ?DEFAULT_CONNECT_TIMEOUT,
last_packet_id = 1}),
{ok, initialized, init_parse_state(State)}.
@ -616,8 +501,6 @@ init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) ->
init(Opts, State#state{auto_ack = AutoAck});
init([{retry_interval, I} | Opts], State) ->
init(Opts, State#state{retry_interval = timer:seconds(I)});
init([{request_handler, Handler} | Opts], State) ->
init(Opts, State#state{request_handler = Handler});
init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) ->
init(Opts, State#state{bridge_mode = Mode});
init([_Opt | Opts], State) ->
@ -743,12 +626,12 @@ waiting_for_connack(EventType, EventContent, State) ->
false -> {stop, connack_timeout}
end.
connected({call, From}, subscriptions, State = #state{subscriptions = Subscriptions}) ->
{keep_state, State, [{reply, From, maps:to_list(Subscriptions)}]};
connected({call, From}, subscriptions, #state{subscriptions = Subscriptions}) ->
{keep_state_and_data, [{reply, From, maps:to_list(Subscriptions)}]};
connected({call, From}, info, State) ->
Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))),
{keep_state, State, [{reply, From, Info}]};
{keep_state_and_data, [{reply, From, Info}]};
connected({call, From}, pause, State) ->
{keep_state, State#state{paused = true}, [{reply, From, ok}]};
@ -756,14 +639,8 @@ connected({call, From}, pause, State) ->
connected({call, From}, resume, State) ->
{keep_state, State#state{paused = false}, [{reply, From, ok}]};
connected({call, From}, get_properties, State = #state{properties = Properties}) ->
{keep_state, State, [{reply, From, Properties}]};
connected({call, From}, client_id, State = #state{client_id = ClientId}) ->
{keep_state, State, [{reply, From, ClientId}]};
connected({call, From}, {set_request_handler, RequestHandler}, State) ->
{keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]};
connected({call, From}, client_id, #state{client_id = ClientId}) ->
{keep_state_and_data, [{reply, From, ClientId}]};
connected({call, From}, SubReq = {subscribe, Properties, Topics},
State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) ->
@ -790,20 +667,19 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) ->
connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}},
State = #state{inflight = Inflight, last_packet_id = PacketId})
when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
case emqx_inflight:is_full(Inflight) of
true ->
{keep_state, State, [{reply, From, {error, {PacketId, inflight_full}}}]};
false ->
Msg1 = Msg#mqtt_msg{packet_id = PacketId},
case send(Msg1, State) of
{ok, NewState} ->
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight),
{keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}),
[{reply, From, {ok, PacketId}}]};
{error, Reason} ->
{stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]}
end
end;
Msg1 = Msg#mqtt_msg{packet_id = PacketId},
case send(Msg1, State) of
{ok, NewState} ->
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight),
State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}),
Actions = [{reply, From, {ok, PacketId}}],
case emqx_inflight:is_full(Inflight1) of
true -> {next_state, inflight_full, State1, Actions};
false -> {keep_state, State1, Actions}
end;
{error, Reason} ->
{stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]}
end;
connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics},
State = #state{last_packet_id = PacketId}) ->
@ -844,43 +720,20 @@ connected(cast, {pubrel, PacketId, ReasonCode, Properties}, State) ->
connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) ->
send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State);
connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), State = #state{paused = true}) ->
{keep_state, State};
connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) ->
keep_state_and_data;
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload),
State) when Properties =/= undefined ->
NewState = response_publish(Properties, State, ?QOS_0, Payload),
{keep_state, deliver(packet_to_msg(Packet), NewState)};
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) ->
{keep_state, deliver(packet_to_msg(Packet), State)};
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, Properties, Payload), State)
when Properties =/= undefined ->
NewState = response_publish(Properties, State, ?QOS_1, Payload),
publish_process(?QOS_1, Packet, NewState);
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
publish_process(?QOS_1, Packet, State);
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, Properties, Payload), State)
when Properties =/= undefined ->
NewState = response_publish(Properties, State, ?QOS_2, Payload),
publish_process(?QOS_2, Packet, NewState);
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
publish_process(?QOS_2, Packet, State);
connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties),
State = #state{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} ->
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
reason_code => ReasonCode,
properties => Properties}),
{keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}};
none ->
emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]),
{keep_state, State}
end;
connected(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) ->
{keep_state, delete_inflight(PubAck, State)};
connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) ->
send_puback(?PUBREL_PACKET(PacketId),
@ -908,21 +761,11 @@ connected(cast, ?PUBREL_PACKET(PacketId),
end;
error ->
emqx_logger:warning("Unexpected PUBREL: ~p", [PacketId]),
{keep_state, State}
keep_state_and_data
end;
connected(cast, ?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
State = #state{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {pubrel, _PacketId, _Ts}} ->
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
reason_code => ReasonCode,
properties => Properties}),
{keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}};
none ->
emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]),
{keep_state, State}
end;
connected(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) ->
{keep_state, delete_inflight(PubComp, State)};
connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes),
State = #state{subscriptions = _Subscriptions}) ->
@ -931,7 +774,8 @@ connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes),
%%TODO: Merge reason codes to subscriptions?
Reply = {ok, Properties, ReasonCodes},
{keep_state, NewState, [{reply, From, Reply}]};
false -> {keep_state, State}
false ->
keep_state_and_data
end;
connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes),
@ -944,16 +788,18 @@ connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes),
end, Subscriptions, Topics),
{keep_state, NewState#state{subscriptions = Subscriptions1},
[{reply, From, {ok, Properties, ReasonCodes}}]};
false -> {keep_state, State}
false ->
keep_state_and_data
end;
connected(cast, ?PACKET(?PINGRESP), State = #state{pending_calls = []}) ->
{keep_state, State};
connected(cast, ?PACKET(?PINGRESP), #state{pending_calls = []}) ->
keep_state_and_data;
connected(cast, ?PACKET(?PINGRESP), State) ->
case take_call(ping, State) of
{value, #call{from = From}, NewState} ->
{keep_state, NewState, [{reply, From, pong}]};
false -> {keep_state, State}
false ->
keep_state_and_data
end;
connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) ->
@ -998,14 +844,16 @@ connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef,
connected(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, connected, Data).
should_ping(Sock) ->
case emqx_client_sock:getstat(Sock, [send_oct]) of
{ok, [{send_oct, Val}]} ->
OldVal = get(send_oct), put(send_oct, Val),
OldVal == undefined orelse OldVal == Val;
Error = {error, _Reason} ->
Error
end.
inflight_full({call, _From}, {publish, #mqtt_msg{qos = QoS}}, _State) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
{keep_state_and_data, [postpone]};
inflight_full(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) ->
delete_inflight_when_full(PubAck, State);
inflight_full(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) ->
delete_inflight_when_full(PubComp, State);
inflight_full(EventType, EventContent, Data) ->
%% inflight_full is a sub-state of connected state,
%% delegate all other events to connected state.
connected(EventType, EventContent, Data).
handle_event({call, From}, stop, _StateName, _State) ->
{stop_and_reply, normal, [{reply, From, ok}]};
@ -1028,17 +876,17 @@ handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) ->
emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]),
{stop, {shutdown, Reason}, State};
handle_event(info, {inet_reply, _Sock, ok}, _, State) ->
{keep_state, State};
handle_event(info, {inet_reply, _Sock, ok}, _, _State) ->
keep_state_and_data;
handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]),
{stop, {shutdown, Reason}, State};
handle_event(EventType, EventContent, StateName, StateData) ->
handle_event(EventType, EventContent, StateName, _StateData) ->
emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)",
[StateName, EventType, EventContent]),
{keep_state, StateData}.
keep_state_and_data.
%% Mandatory callback functions
terminate(Reason, _StateName, State = #state{socket = Socket}) ->
@ -1061,57 +909,47 @@ code_change(_Vsn, State, Data, _Extra) ->
%% Internal functions
%%------------------------------------------------------------------------------
%% Subscribe to response topic.
-spec(sub_response_topic(client(), qos(), topic()) -> ok).
sub_response_topic(Client, QoS, Topic) when is_binary(Topic) ->
subscribe_req_rsp_topic(Client, QoS, Topic).
receive_response(Client, CorrData, TRef, MRef) ->
receive
{publish, Response} ->
{ok, Properties} = maps:find(properties, Response),
case maps:find('Correlation-Data', Properties) of
{ok, CorrData} ->
maps:find(payload, Response);
_ ->
emqx_logger:debug("Discarded stale response: ~p", [Response]),
receive_response(Client, CorrData, TRef, MRef)
end;
{timeout, TRef, response} ->
{error, timeout};
{'DOWN', MRef, process, _, _} ->
{error, client_down}
should_ping(Sock) ->
case emqx_client_sock:getstat(Sock, [send_oct]) of
{ok, [{send_oct, Val}]} ->
OldVal = get(send_oct), put(send_oct, Val),
OldVal == undefined orelse OldVal == Val;
Error = {error, _Reason} ->
Error
end.
%% Make a unique correlation data for each request.
%% It has to be unique because stale responses should be discarded.
make_corr_data() -> term_to_binary(make_ref()).
delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties),
State = #state{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} ->
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
reason_code => ReasonCode,
properties => Properties}),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]),
State
end;
delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
State = #state{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {pubrel, _PacketId, _Ts}} ->
ok = eval_msg_handler(State, puback, #{packet_id => PacketId,
reason_code => ReasonCode,
properties => Properties}),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]),
State
end.
%% Shared function for request and response topic subscription.
subscribe_req_rsp_topic(Client, QoS, Topic) ->
%% It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription
{ok, _Props, _QoS} = subscribe(Client, [{Topic, [{rh, 2}, {rap, false},
{nl, not ?IS_SHARE(Topic)},
{qos, QoS}]}]),
emqx_logger:debug("Subscribed to topic ~s", [Topic]),
ok.
%% Make a request or response topic.
make_req_rsp_topic(Properties, Topic) ->
make_req_rsp_topic(Properties, Topic, ?NO_GROUP).
%% Same as make_req_rsp_topic/2, but allow shared subscription (for request topics)
make_req_rsp_topic(Properties, Topic, Group) ->
case maps:find('Response-Information', Properties) of
{ok, ResponseInformation} when ResponseInformation =/= <<>> ->
emqx_topic:join([req_rsp_topic_prefix(Group, ResponseInformation), Topic]);
_ ->
erlang:error(no_response_information)
delete_inflight_when_full(Packet, State0) ->
State = #state{inflight = Inflight} = delete_inflight(Packet, State0),
case emqx_inflight:is_full(Inflight) of
true -> {keep_state, State};
false -> {next_state, connected, State}
end.
req_rsp_topic_prefix(?NO_GROUP, Prefix) -> Prefix;
req_rsp_topic_prefix(Group, Prefix) -> ?SHARE(Group, Prefix).
assign_id(?NO_CLIENT_ID, Props) ->
case maps:find('Assigned-Client-Identifier', Props) of
{ok, Value} ->
@ -1137,49 +975,6 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
Stop -> Stop
end.
response_publish(#{'Response-Topic' := ResponseTopic} = Properties,
State = #state{request_handler = RequestHandler}, QoS, Payload)
when RequestHandler =/= ?NO_REQ_HANDLER ->
do_publish(ResponseTopic, Properties, State, QoS, Payload);
response_publish(_Properties, State, _QoS, _Payload) -> State.
do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler}, ?QOS_0, Payload) ->
Msg = #mqtt_msg{qos = ?QOS_0,
retain = false,
topic = ResponseTopic,
props = Properties,
payload = RequestHandler(Payload)
},
case send(Msg, State) of
{ok, NewState} -> NewState;
_Error -> State
end;
do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler,
inflight = Inflight,
last_packet_id = PacketId},
QoS, Payload)
when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2)->
case emqx_inflight:is_full(Inflight) of
true ->
emqx_logger:error("Inflight is full"),
State;
false ->
Msg = #mqtt_msg{packet_id = PacketId,
qos = QoS,
retain = false,
topic = ResponseTopic,
props = Properties,
payload = RequestHandler(Payload)},
case send(Msg, State) of
{ok, NewState} ->
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight),
ensure_retry_timer(NewState#state{inflight = Inflight1});
{error, Reason} ->
emqx_logger:error("Send failed: ~p", [Reason]),
State
end
end.
ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) ->
ensure_keepalive_timer(timer:seconds(Secs), State);
ensure_keepalive_timer(State = #state{keepalive = 0}) ->
@ -1222,11 +1017,12 @@ ensure_ack_timer(State = #state{ack_timer = undefined,
ensure_ack_timer(State) -> State.
ensure_retry_timer(State = #state{retry_interval = Interval}) ->
ensure_retry_timer(Interval, State).
ensure_retry_timer(Interval, State = #state{retry_timer = undefined})
do_ensure_retry_timer(Interval, State).
do_ensure_retry_timer(Interval, State = #state{retry_timer = undefined})
when Interval > 0 ->
State#state{retry_timer = erlang:start_timer(Interval, self(), retry)};
ensure_retry_timer(_Interval, State) ->
do_ensure_retry_timer(_Interval, State) ->
State.
retry_send(State = #state{inflight = Inflight}) ->
@ -1243,7 +1039,7 @@ retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interv
{ok, NewState} -> retry_send(Msgs, Now, NewState);
{error, Error} -> {stop, Error}
end;
false -> {keep_state, ensure_retry_timer(Interval - Diff, State)}
false -> {keep_state, do_ensure_retry_timer(Interval - Diff, State)}
end.
retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId},
@ -1265,9 +1061,6 @@ retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) ->
Error
end.
deliver(_Msg, State = #state{request_handler = Hdlr}) when Hdlr =/= ?NO_REQ_HANDLER ->
%% message has been terminated by request handler, hence should not continue processing
State;
deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
topic = Topic, props = Props, payload = Payload},
State) ->
@ -1277,17 +1070,17 @@ deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
ok = eval_msg_handler(State, publish, Msg),
State.
eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER,
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
owner = Owner},
disconnected, {ReasonCode, Properties}) ->
%% Special handling for disconnected message when there is no handler callback
Owner ! {disconnected, ReasonCode, Properties},
ok;
eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER},
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR},
disconnected, _OtherReason) ->
%% do nothing to be backward compatible
ok;
eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER,
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
owner = Owner}, Kind, Msg) ->
Owner ! {Kind, Msg},
ok;

View File

@ -141,7 +141,15 @@ init({Transport, RawSocket, Options}) ->
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
SendFun = fun(Data) -> Transport:async_send(Socket, Data) end,
SendFun = fun(Packet, SeriaOpts) ->
Data = emqx_frame:serialize(Packet, SeriaOpts),
case Transport:async_send(Socket, Data) of
ok ->
{ok, Data};
{error, Reason} ->
{error, Reason}
end
end,
ProtoState = emqx_protocol:init(#{peername => Peername,
sockname => Sockname,
peercert => Peercert,
@ -484,4 +492,3 @@ shutdown(Reason, State) ->
stop(Reason, State) ->
{stop, Reason, State}.

View File

@ -22,12 +22,20 @@
-export([start_link/0, stop/0]).
%% Hooks API
-export([add/2, add/3, add/4, del/2, run/2, run/3, lookup/1]).
-export([add/2, add/3, add/4, del/2, run/2, run_fold/3, lookup/1]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
%% Multiple callbacks can be registered on a hookpoint.
%% The execution order depends on the priority value:
%% - Callbacks with greater priority values will be run before
%% the ones with lower priority values. e.g. A Callback with
%% priority = 2 precedes the callback with priority = 1.
%% - The execution order is the adding order of callbacks if they have
%% equal priority values.
-type(hookpoint() :: atom()).
-type(action() :: function() | mfa()).
-type(filter() :: function() | mfa()).
@ -56,14 +64,14 @@ stop() ->
%%------------------------------------------------------------------------------
%% @doc Register a callback
-spec(add(hookpoint(), action() | #callback{}) -> emqx_types:ok_or_error(already_exists)).
-spec(add(hookpoint(), action() | #callback{}) -> ok_or_error(already_exists)).
add(HookPoint, Callback) when is_record(Callback, callback) ->
gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity);
add(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
add(HookPoint, #callback{action = Action, priority = 0}).
-spec(add(hookpoint(), action(), filter() | integer() | list())
-> emqx_types:ok_or_error(already_exists)).
-> ok_or_error(already_exists)).
add(HookPoint, Action, InitArgs) when is_function(Action), is_list(InitArgs) ->
add(HookPoint, #callback{action = {Action, InitArgs}, priority = 0});
add(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) ->
@ -72,8 +80,8 @@ add(HookPoint, Action, Priority) when is_integer(Priority) ->
add(HookPoint, #callback{action = Action, priority = Priority}).
-spec(add(hookpoint(), action(), filter(), integer())
-> emqx_types:ok_or_error(already_exists)).
add(HookPoint, Action, Filter, Priority) ->
-> ok_or_error(already_exists)).
add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
%% @doc Unregister a callback.
@ -82,47 +90,53 @@ del(HookPoint, Action) ->
gen_server:cast(?SERVER, {del, HookPoint, Action}).
%% @doc Run hooks.
-spec(run(atom(), list(Arg :: any())) -> ok | stop).
-spec(run(atom(), list(Arg::term())) -> ok).
run(HookPoint, Args) ->
run_(lookup(HookPoint), Args).
do_run(lookup(HookPoint), Args).
%% @doc Run hooks with Accumulator.
-spec(run(atom(), list(Arg :: any()), any()) -> any()).
run(HookPoint, Args, Acc) ->
run_(lookup(HookPoint), Args, Acc).
-spec(run_fold(atom(), list(Arg::term()), Acc::term()) -> Acc::term()).
run_fold(HookPoint, Args, Acc) ->
do_run_fold(lookup(HookPoint), Args, Acc).
%% @private
run_([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
case filtered(Filter, Args) orelse execute(Action, Args) of
true -> run_(Callbacks, Args);
ok -> run_(Callbacks, Args);
stop -> stop;
_Any -> run_(Callbacks, Args)
do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
case filter_passed(Filter, Args) andalso execute(Action, Args) of
%% stop the hook chain and return
stop -> ok;
%% continue the hook chain, in following cases:
%% - the filter validation failed with 'false'
%% - the callback returns any term other than 'stop'
_ -> do_run(Callbacks, Args)
end;
run_([], _Args) ->
do_run([], _Args) ->
ok.
%% @private
run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
Args1 = Args ++ [Acc],
case filtered(Filter, Args1) orelse execute(Action, Args1) of
true -> run_(Callbacks, Args, Acc);
ok -> run_(Callbacks, Args, Acc);
{ok, NewAcc} -> run_(Callbacks, Args, NewAcc);
stop -> {stop, Acc};
{stop, NewAcc} -> {stop, NewAcc};
_Any -> run_(Callbacks, Args, Acc)
case filter_passed(Filter, Args1) andalso execute(Action, Args1) of
%% stop the hook chain
stop -> Acc;
%% stop the hook chain with NewAcc
{stop, NewAcc} -> NewAcc;
%% continue the hook chain with NewAcc
{ok, NewAcc} -> do_run_fold(Callbacks, Args, NewAcc);
%% continue the hook chain, in following cases:
%% - the filter validation failed with 'false'
%% - the callback returns any term other than 'stop' or {'stop', NewAcc}
_ -> do_run_fold(Callbacks, Args, Acc)
end;
run_([], _Args, Acc) ->
{ok, Acc}.
do_run_fold([], _Args, Acc) ->
Acc.
filtered(undefined, _Args) ->
false;
filtered(Filter, Args) ->
-spec(filter_passed(filter(), Args::term()) -> true | false).
filter_passed(undefined, _Args) -> true;
filter_passed(Filter, Args) ->
execute(Filter, Args).
execute(Action, Args) when is_function(Action) ->
erlang:apply(Action, Args);
%% @doc execute a function.
execute(Fun, Args) when is_function(Fun) ->
erlang:apply(Fun, Args);
execute({Fun, InitArgs}, Args) when is_function(Fun) ->
erlang:apply(Fun, Args ++ InitArgs);
execute({M, F, A}, Args) ->
@ -142,11 +156,11 @@ lookup(HookPoint) ->
%%------------------------------------------------------------------------------
init([]) ->
ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}, protected]),
{ok, #{}}.
handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->
Reply = case lists:keymember(Action, 2, Callbacks = lookup(HookPoint)) of
Reply = case lists:keymember(Action, #callback.action, Callbacks = lookup(HookPoint)) of
true ->
{error, already_exists};
false ->

View File

@ -81,11 +81,12 @@ unset_flag(Flag, Msg = #message{flags = Flags}) ->
false -> Msg
end.
-spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()).
-spec(set_headers(undefined | map(), emqx_types:message()) -> emqx_types:message()).
set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) ->
Msg#message{headers = Headers};
set_headers(New, Msg = #message{headers = Old}) when is_map(New) ->
Msg#message{headers = maps:merge(Old, New)}.
Msg#message{headers = maps:merge(Old, New)};
set_headers(undefined, Msg) -> Msg.
-spec(get_header(term(), emqx_types:message()) -> term()).
get_header(Hdr, Msg) ->

View File

@ -12,57 +12,56 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_acl_internal).
-module(emqx_mod_acl_internal).
-behaviour(emqx_acl_mod).
-behaviour(emqx_gen_mod).
-include("emqx.hrl").
-include("logger.hrl").
-export([load/1, unload/1]).
-export([all_rules/0]).
%% ACL mod callbacks
-export([init/1, check_acl/2, reload_acl/1, description/0]).
-export([check_acl/5, reload_acl/0]).
-define(ACL_RULE_TAB, emqx_acl_rule).
-type(state() :: #{acl_file := string()}).
-define(FUNC(M, F, A), {M, F, A}).
-type(acl_rules() :: #{publish => [emqx_access_rule:rule()],
subscribe => [emqx_access_rule:rule()]}).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
load(_Env) ->
Rules = load_rules_from_file(acl_file()),
emqx_hooks:add('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules]), -1).
unload(_Env) ->
Rules = load_rules_from_file(acl_file()),
emqx_hooks:del('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules])).
%% @doc Read all rules
-spec(all_rules() -> list(emqx_access_rule:rule())).
all_rules() ->
case ets:lookup(?ACL_RULE_TAB, all_rules) of
[] -> [];
[{_, Rules}] -> Rules
end.
load_rules_from_file(acl_file()).
%%------------------------------------------------------------------------------
%% ACL callbacks
%%------------------------------------------------------------------------------
-spec(init([File :: string()]) -> {ok, #{}}).
init([File]) ->
_ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
ok = load_rules_from_file(File),
{ok, #{acl_file => File}}.
load_rules_from_file(AclFile) ->
case file:consult(AclFile) of
{ok, Terms} ->
Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
lists:foreach(fun(PubSub) ->
ets:insert(?ACL_RULE_TAB, {PubSub,
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
end, [publish, subscribe]),
ets:insert(?ACL_RULE_TAB, {all_rules, Terms}),
ok;
#{publish => lists:filter(fun(Rule) -> filter(publish, Rule) end, Rules),
subscribe => lists:filter(fun(Rule) -> filter(subscribe, Rule) end, Rules)};
{error, Reason} ->
emqx_logger:error("[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
{error, Reason}
?LOG(error, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
#{}
end.
filter(_PubSub, {allow, all}) ->
@ -79,20 +78,18 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
false.
%% @doc Check ACL
-spec(check_acl({emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic()}, #{})
-> allow | deny | ignore).
check_acl({Credentials, PubSub, Topic}, _State) ->
case match(Credentials, Topic, lookup(PubSub)) of
{matched, allow} -> allow;
{matched, deny} -> deny;
nomatch -> ignore
-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic(),
emqx_access_rule:acl_result(), acl_rules())
-> {ok, allow} | {ok, deny} | ok).
check_acl(Credentials, PubSub, Topic, _AclResult, Rules) ->
case match(Credentials, Topic, lookup(PubSub, Rules)) of
{matched, allow} -> {ok, allow};
{matched, deny} -> {ok, deny};
nomatch -> ok
end.
lookup(PubSub) ->
case ets:lookup(?ACL_RULE_TAB, PubSub) of
[] -> [];
[{PubSub, Rules}] -> Rules
end.
lookup(PubSub, Rules) ->
maps:get(PubSub, Rules, []).
match(_Credentials, _Topic, []) ->
nomatch;
@ -104,11 +101,11 @@ match(Credentials, Topic, [Rule|Rules]) ->
{matched, AllowDeny}
end.
-spec(reload_acl(state()) -> ok | {error, term()}).
reload_acl(#{acl_file := AclFile}) ->
try load_rules_from_file(AclFile) of
-spec(reload_acl() -> ok | {error, term()}).
reload_acl() ->
try load_rules_from_file(acl_file()) of
ok ->
emqx_logger:info("Reload acl_file ~s successfully", [AclFile]),
emqx_logger:info("Reload acl_file ~s successfully", [acl_file()]),
ok;
{error, Error} ->
{error, Error}
@ -118,6 +115,5 @@ reload_acl(#{acl_file := AclFile}) ->
{error, Reason}
end.
-spec(description() -> string()).
description() ->
"Internal ACL with etc/acl.conf".
acl_file() ->
emqx_config:get_env(acl_file).

View File

@ -18,6 +18,7 @@
-spec(load() -> ok).
load() ->
ok = emqx_mod_acl_internal:load([]),
lists:foreach(
fun({Mod, Env}) ->
ok = Mod:load(Env),
@ -26,6 +27,7 @@ load() ->
-spec(unload() -> ok).
unload() ->
ok = emqx_mod_acl_internal:unload([]),
lists:foreach(
fun({Mod, Env}) ->
Mod:unload(Env) end,

View File

@ -24,6 +24,8 @@
-export([list/0]).
-export([find_plugin/1]).
-export([load_expand_plugin/1]).
%% @doc Init plugins' config

View File

@ -57,7 +57,6 @@
will_msg,
keepalive,
mountpoint,
is_super,
is_bridge,
enable_ban,
enable_acl,
@ -68,7 +67,8 @@
connected_at,
ignore_loop,
topic_alias_maximum,
conn_mod
conn_mod,
credentials
}).
-opaque(state() :: #pstate{}).
@ -97,7 +97,6 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
is_assigned = false,
conn_pid = self(),
username = init_username(Peercert, Options),
is_super = false,
clean_start = false,
topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size),
@ -111,7 +110,8 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
topic_alias_maximum = #{to_client => 0, from_client => 0},
conn_mod = maps:get(conn_mod, SocketOpts, undefined)}.
conn_mod = maps:get(conn_mod, SocketOpts, undefined),
credentials = #{}}.
init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of
@ -153,10 +153,10 @@ attrs(#pstate{zone = Zone,
proto_name = ProtoName,
keepalive = Keepalive,
mountpoint = Mountpoint,
is_super = IsSuper,
is_bridge = IsBridge,
connected_at = ConnectedAt,
conn_mod = ConnMod}) ->
conn_mod = ConnMod,
credentials = Credentials}) ->
[{zone, Zone},
{client_id, ClientId},
{username, Username},
@ -167,10 +167,11 @@ attrs(#pstate{zone = Zone,
{clean_start, CleanStart},
{keepalive, Keepalive},
{mountpoint, Mountpoint},
{is_super, IsSuper},
{is_bridge, IsBridge},
{connected_at, ConnectedAt},
{conn_mod, ConnMod}].
{conn_mod, ConnMod},
{credentials, Credentials}
].
attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Receive-Maximum', ConnProps, 65535);
@ -200,6 +201,8 @@ caps(#pstate{zone = Zone}) ->
client_id(#pstate{client_id = ClientId}) ->
ClientId.
credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 ->
Credentials;
credentials(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
@ -362,8 +365,7 @@ process(?CONNECT_PACKET(
%% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
PState1 = set_username(Username,
PState0 = set_username(Username,
PState#pstate{client_id = NewClientId,
proto_ver = ProtoVer,
proto_name = ProtoName,
@ -372,20 +374,21 @@ process(?CONNECT_PACKET(
conn_props = ConnProps,
is_bridge = IsBridge,
connected_at = os:timestamp()}),
Credentials = credentials(PState0),
PState1 = PState0#pstate{credentials = Credentials},
connack(
case check_connect(ConnPkt, PState1) of
{ok, PState2} ->
case authenticate(credentials(PState2), Password) of
{ok, IsSuper} ->
%% Maybe assign a clientId
PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
ok ->
case emqx_access_control:authenticate(Credentials#{password => Password}) of
{ok, Credentials0} ->
PState3 = maybe_assign_client_id(PState1),
emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
%% Open session
SessAttrs = #{will_msg => make_will_msg(ConnPkt)},
case try_open_session(SessAttrs, PState3) of
{ok, SPid, SP} ->
PState4 = PState3#pstate{session = SPid, connected = true},
PState4 = PState3#pstate{session = SPid, connected = true,
credentials = maps:remove(password, Credentials0)},
ok = emqx_cm:register_connection(client_id(PState4)),
true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)),
%% Start keepalive
@ -394,11 +397,11 @@ process(?CONNECT_PACKET(
{?RC_SUCCESS, SP, PState4};
{error, Error} ->
?LOG(error, "Failed to open session: ~p", [Error]),
{?RC_UNSPECIFIED_ERROR, PState1}
{?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}}
end;
{error, Reason} ->
?LOG(error, "Username '~s' login failed for ~p", [Username, Reason]),
{?RC_NOT_AUTHORIZED, PState1}
?LOG(error, "Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]),
{emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}}
end;
{error, ReasonCode} ->
{ReasonCode, PState1}
@ -406,8 +409,8 @@ process(?CONNECT_PACKET(
process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos0 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
@ -416,8 +419,8 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos1 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
@ -430,8 +433,8 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos2 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
@ -480,16 +483,10 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
case check_subscribe(
parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of
{ok, TopicFilters} ->
case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of
{ok, TopicFilters1} ->
ok = emqx_session:subscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(Mountpoint, TopicFilters1)),
{ok, PState};
{stop, _} ->
ReasonCodes = lists:duplicate(length(TopicFilters),
?RC_IMPLEMENTATION_SPECIFIC_ERROR),
deliver({suback, PacketId, ReasonCodes}, PState)
end;
TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [credentials(PState)], TopicFilters),
ok = emqx_session:subscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(Mountpoint, TopicFilters0)),
{ok, PState};
{error, TopicFilters} ->
{SubTopics, ReasonCodes} =
lists:foldr(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) ->
@ -509,17 +506,11 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
case emqx_hooks:run('client.unsubscribe', [credentials(PState)],
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)) of
{ok, TopicFilters} ->
ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(MountPoint, TopicFilters)),
{ok, PState};
{stop, _Acc} ->
ReasonCodes = lists:duplicate(length(RawTopicFilters),
?RC_IMPLEMENTATION_SPECIFIC_ERROR),
deliver({unsuback, PacketId, ReasonCodes}, PState)
end;
TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [credentials(PState)],
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)),
ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(MountPoint, TopicFilters)),
{ok, PState};
process(?PACKET(?PINGREQ), PState) ->
send(?PACKET(?PINGRESP), PState);
@ -547,11 +538,11 @@ process(?DISCONNECT_PACKET(_), PState) ->
%%------------------------------------------------------------------------------
connack({?RC_SUCCESS, SP, PState}) ->
emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
ok = emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]),
ok = emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]),
[ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer),
_ = deliver({connack, ReasonCode1}, PState),
{error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}.
@ -607,27 +598,31 @@ deliver({connack, ReasonCode}, PState) ->
deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
proto_ver = ?MQTT_PROTO_V5,
client_id = ClientId,
conn_props = ConnProps,
is_assigned = IsAssigned,
topic_alias_maximum = TopicAliasMaximum}) ->
ResponseInformation = case maps:find('Request-Response-Information', ConnProps) of
{ok, 1} ->
iolist_to_binary(emqx_config:get_env(response_topic_prefix));
_ ->
<<>>
end,
#{max_packet_size := MaxPktSize,
max_qos_allowed := MaxQoS,
mqtt_retain_available := Retain,
max_topic_alias := MaxAlias,
mqtt_shared_subscription := Shared,
mqtt_wildcard_subscription := Wildcard} = caps(PState),
%% Response-Information is so far not set by broker.
%% i.e. It's a Client-to-Client contract for the request-response topic naming scheme.
%% According to MQTT 5.0 spec:
%% A common use of this is to pass a globally unique portion of the topic tree which
%% is reserved for this Client for at least the lifetime of its Session.
%% This often cannot just be a random name as both the requesting Client and the
%% responding Client need to be authorized to use it.
%% If we are to support it in the feature, the implementation should be flexible
%% to allow prefixing the response topic based on different ACL config.
%% e.g. prefix by username or client-id, so that unauthorized clients can not
%% subscribe requests or responses that are not intended for them.
Props = #{'Retain-Available' => flag(Retain),
'Maximum-Packet-Size' => MaxPktSize,
'Topic-Alias-Maximum' => MaxAlias,
'Wildcard-Subscription-Available' => flag(Wildcard),
'Subscription-Identifier-Available' => 1,
'Response-Information' => ResponseInformation,
%'Response-Information' =>
'Shared-Subscription-Available' => flag(Shared)},
Props1 = if
@ -656,8 +651,8 @@ deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState);
deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
_ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
Msg1 = emqx_message:update_expiry(Msg),
Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg),
Msg1 = emqx_message:update_expiry(Msg0),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
send(emqx_packet:from_message(PacketId, emqx_message:remove_topic_alias(Msg2)), PState);
@ -688,9 +683,8 @@ deliver({disconnect, _ReasonCode}, PState) ->
-spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) ->
Data = emqx_frame:serialize(Packet, #{version => Ver}),
case Send(Data) of
ok ->
case Send(Packet, #{version => Ver}) of
{ok, Data} ->
trace(send, Packet),
emqx_metrics:sent(Packet),
emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)),
@ -740,17 +734,6 @@ try_open_session(SessAttrs, PState = #pstate{zone = Zone,
Other -> Other
end.
authenticate(Credentials, Password) ->
case emqx_access_control:authenticate(Credentials, Password) of
ok -> {ok, false};
{ok, IsSuper} when is_boolean(IsSuper) ->
{ok, IsSuper};
{ok, Result} when is_map(Result) ->
{ok, maps:get(is_superuser, Result, false)};
{error, Error} ->
{error, Error}
end.
set_property(Name, Value, ?NO_PROPS) ->
#{Name => Value};
set_property(Name, Value, Props) ->
@ -851,25 +834,21 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret
#pstate{zone = Zone}) ->
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
ok;
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) ->
case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
allow -> ok;
deny ->
{error, ?RC_NOT_AUTHORIZED}
deny -> {error, ?RC_NOT_AUTHORIZED}
end.
run_check_steps([], _Packet, PState) ->
{ok, PState};
run_check_steps([], _Packet, _PState) ->
ok;
run_check_steps([Check|Steps], Packet, PState) ->
case Check(Packet, PState) of
ok ->
run_check_steps(Steps, Packet, PState);
{ok, PState1} ->
run_check_steps(Steps, Packet, PState1);
Error = {error, _RC} ->
Error
end.
@ -882,15 +861,13 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) ->
{error, TopicFilter1}
end.
check_sub_acl(TopicFilters, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
{ok, TopicFilters};
check_sub_acl(TopicFilters, PState) ->
Credentials = credentials(PState),
lists:foldr(
fun({Topic, SubOpts}, {Ok, Acc}) ->
case emqx_access_control:check_acl(Credentials, subscribe, Topic) of
case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
allow -> {Ok, [{Topic, SubOpts}|Acc]};
deny ->
{error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
@ -924,7 +901,7 @@ terminate(discard, _PState) ->
ok;
terminate(Reason, PState) ->
?LOG(info, "Shutdown for ~p", [Reason]),
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
ok = emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
start_keepalive(0, _PState) ->
ignore;

38
src/emqx_psk.erl Normal file
View File

@ -0,0 +1,38 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_psk).
-include("logger.hrl").
%% SSL PSK Callbacks
-export([lookup/3]).
-define(TAB, ?MODULE).
-type psk_identity() :: string().
-type psk_user_state() :: term().
-spec lookup(psk, psk_identity(), psk_user_state()) -> {ok, SharedSecret :: binary()} | error.
lookup(psk, ClientPSKID, _UserState) ->
try emqx_hooks:run_fold('tls_handshake.psk_lookup', [ClientPSKID], not_found) of
SharedSecret when is_binary(SharedSecret) -> {ok, SharedSecret};
Error ->
?LOG(error, "Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]),
error
catch
Except:Error:Stacktrace ->
?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]),
error
end.

View File

@ -17,7 +17,7 @@
-include("emqx_mqtt.hrl").
-export([name/2, text/1]).
-export([name/2, text/1, connack_error/1]).
-export([compat/2]).
name(I, Ver) when Ver >= ?MQTT_PROTO_V5 ->
@ -143,3 +143,14 @@ compat(suback, Code) when Code =< ?QOS_2 -> Code;
compat(suback, Code) when Code >= 16#80 -> 16#80;
compat(unsuback, _Code) -> undefined.
connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID;
connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(username_or_password_undefined) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(password_error) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(not_authorized) -> ?RC_NOT_AUTHORIZED;
connack_error(server_unavailable) -> ?RC_SERVER_UNAVAILABLE;
connack_error(server_busy) -> ?RC_SERVER_BUSY;
connack_error(banned) -> ?RC_BANNED;
connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD;
connack_error(_) -> ?RC_NOT_AUTHORIZED.

View File

@ -21,10 +21,18 @@
-define(RPC, gen_rpc).
call(Node, Mod, Fun, Args) ->
?RPC:call(Node, Mod, Fun, Args).
filter_result(?RPC:call(Node, Mod, Fun, Args)).
multicall(Nodes, Mod, Fun, Args) ->
?RPC:multicall(Nodes, Mod, Fun, Args).
filter_result(?RPC:multicall(Nodes, Mod, Fun, Args)).
cast(Node, Mod, Fun, Args) ->
?RPC:cast(Node, Mod, Fun, Args).
filter_result(?RPC:cast(Node, Mod, Fun, Args)).
filter_result(Delivery) ->
case Delivery of
{badrpc, Reason} -> {badrpc, Reason};
{badtcp, Reason} -> {badrpc, Reason};
_ -> Delivery
end.

View File

@ -369,7 +369,7 @@ init([Parent, #{zone := Zone,
ok = emqx_sm:register_session(ClientId, self()),
true = emqx_sm:set_session_attrs(ClientId, attrs(State)),
true = emqx_sm:set_session_stats(ClientId, stats(State)),
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
ok = emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
ok = emqx_misc:init_proc_mng_policy(Zone),
ok = proc_lib:init_ack(Parent, {ok, self()}),
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
@ -466,22 +466,13 @@ handle_call(Req, _From, State) ->
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
{[QoS|RcAcc], case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
SubMap;
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, SubOpts),
%% Why???
emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
maps:put(Topic, SubOpts, SubMap)
end}
end, {[], Subscriptions}, TopicFilters),
lists:foldr(
fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when
RC == ?QOS_0; RC == ?QOS_1; RC == ?QOS_2 ->
{[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)};
({_Topic, #{rc := RC}}, {RcAcc, SubMap}) ->
{[RC|RcAcc], SubMap}
end, {[], Subscriptions}, TopicFilters),
suback(FromPid, PacketId, ReasonCodes),
noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1}));
@ -493,7 +484,7 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
ok = emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]),
ok = emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]),
{[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
error ->
{[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
@ -568,7 +559,7 @@ handle_cast({resume, #{conn_pid := ConnPid,
%% Clean Session: true -> false???
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
ok = emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
%% Replay delivery and Dequeue pending messages
noreply(ensure_stats_timer(dequeue(retry_delivery(true, State1))));
@ -668,7 +659,7 @@ terminate(Reason, #state{will_msg = WillMsg,
old_conn_pid = OldConnPid}) ->
send_willmsg(WillMsg),
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -941,7 +932,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use
if
Dropped =/= undefined ->
SessProps = #{client_id => ClientId, username => Username},
emqx_hooks:run('message.dropped', [SessProps, Msg]);
ok = emqx_hooks:run('message.dropped', [SessProps, Msg]);
true -> ok
end,
State#state{mqueue = NewQ}.
@ -980,7 +971,7 @@ await(PacketId, Msg, State = #state{inflight = Inflight}) ->
acked(puback, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, {_, Msg}, _Ts}} ->
emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg),
ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]),
@ -990,7 +981,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, username = Username
acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, {_, Msg}, _Ts}} ->
emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg),
ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
{value, {pubrel, PacketId, _Ts}} ->
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]),
@ -1118,3 +1109,18 @@ noreply(State) ->
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.
do_subscribe(ClientId, Username, Topic, SubOpts, SubMap) ->
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
SubMap;
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, SubOpts),
%% Why???
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
maps:put(Topic, SubOpts, SubMap)
end.

View File

@ -62,8 +62,6 @@ init([]) ->
%% Broker Sup
BrokerSup = supervisor_spec(emqx_broker_sup),
BridgeSup = supervisor_spec(emqx_bridge_sup),
%% AccessControl
AccessControl = worker_spec(emqx_access_control),
%% Session Manager
SMSup = supervisor_spec(emqx_sm_sup),
%% Connection Manager
@ -75,7 +73,6 @@ init([]) ->
RouterSup,
BrokerSup,
BridgeSup,
AccessControl,
SMSup,
CMSup,
SysSup]}}.

View File

@ -70,7 +70,7 @@ datetime() ->
%% @doc Get sys interval
-spec(sys_interval() -> pos_integer()).
sys_interval() ->
application:get_env(?APP, sys_interval, 60000).
application:get_env(?APP, broker_sys_interval, 60000).
%% @doc Get sys info
-spec(info() -> list(tuple())).

View File

@ -14,10 +14,13 @@
-module(emqx_topic).
-include("emqx_mqtt.hrl").
-export([match/2]).
-export([validate/1, validate/2]).
-export([levels/1]).
-export([triples/1]).
-export([tokens/1]).
-export([words/1]).
-export([wildcard/1]).
-export([join/1, prepend/2]).
@ -35,8 +38,6 @@
-define(MAX_TOPIC_LEN, 4096).
-include("emqx_mqtt.hrl").
%% @doc Is wildcard topic?
-spec(wildcard(topic() | words()) -> true | false).
wildcard(Topic) when is_binary(Topic) ->
@ -147,13 +148,19 @@ bin('#') -> <<"#">>;
bin(B) when is_binary(B) -> B;
bin(L) when is_list(L) -> list_to_binary(L).
-spec(levels(topic()) -> pos_integer()).
levels(Topic) when is_binary(Topic) ->
length(words(Topic)).
length(tokens(Topic)).
%% @doc Split topic to tokens.
-spec(tokens(topic()) -> list(binary())).
tokens(Topic) ->
binary:split(Topic, <<"/">>, [global]).
%% @doc Split Topic Path to Words
-spec(words(topic()) -> words()).
words(Topic) when is_binary(Topic) ->
[word(W) || W <- binary:split(Topic, <<"/">>, [global])].
[word(W) || W <- tokens(Topic)].
word(<<>>) -> '';
word(<<"+">>) -> '+';
@ -210,6 +217,8 @@ parse(Topic = <<?SHARE, "/", Topic1/binary>>, Options) ->
_ -> error({invalid_topic, Topic})
end
end;
parse(Topic, Options = #{qos := QoS}) ->
{Topic, Options#{rc => QoS}};
parse(Topic, Options) ->
{Topic, Options}.

View File

@ -143,12 +143,13 @@ websocket_init(#state{request = Req, options = Options}) ->
idle_timeout = IdleTimout}}.
send_fun(WsPid) ->
fun(Data) ->
fun(Packet, Options) ->
Data = emqx_frame:serialize(Packet, Options),
BinSize = iolist_size(Data),
emqx_pd:update_counter(send_cnt, 1),
emqx_pd:update_counter(send_oct, BinSize),
WsPid ! {binary, iolist_to_binary(Data)},
ok
{ok, Data}
end.
stat_fun() ->
@ -305,4 +306,3 @@ shutdown(Reason, State) ->
wsock_stats() ->
[{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].

View File

@ -30,7 +30,7 @@ init_per_suite(Config) ->
[start_apps(App, {SchemaFile, ConfigFile}) ||
{App, SchemaFile, ConfigFile}
<- [{emqx, local_path("priv/emqx.schema"),
local_path("etc/emqx.conf")}]],
local_path("etc/gen.emqx.conf")}]],
Config.
end_per_suite(_Config) ->
@ -39,6 +39,16 @@ end_per_suite(_Config) ->
local_path(RelativePath) ->
filename:join([get_base_dir(), RelativePath]).
deps_path(App, RelativePath) ->
%% Note: not lib_dir because etc dir is not sym-link-ed to _build dir
%% but priv dir is
Path0 = code:priv_dir(App),
Path = case file:read_link(Path0) of
{ok, Resolved} -> Resolved;
{error, _} -> Path0
end,
filename:join([Path, "..", RelativePath]).
get_base_dir() ->
{file, Here} = code:is_loaded(?MODULE),
filename:dirname(filename:dirname(Here)).
@ -56,6 +66,9 @@ read_schema_configs(App, {SchemaFile, ConfigFile}) ->
Vals = proplists:get_value(App, NewConfig, []),
[application:set_env(App, Par, Value) || {Par, Value} <- Vals].
set_special_configs(emqx) ->
application:set_env(emqx, acl_file, deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
set_special_configs(_App) ->
ok.
@ -85,12 +98,12 @@ t_alarm_handler(_) ->
Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>),
Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>),
SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0},
emqx_client_sock:send(Sock,
emqx_client_sock:send(Sock,
raw_send_serialize(
?SUBSCRIBE_PACKET(
1,
1,
[{Topic1, SubOpts},
{Topic2, SubOpts}]),
{Topic2, SubOpts}]),
#{version => ?MQTT_PROTO_V5})),
{ok, Data2} = gen_tcp:recv(Sock, 0),
@ -119,16 +132,16 @@ t_alarm_handler(_) ->
t_logger_handler(_) ->
%% Meck supervisor report
logger:log(error, #{label => {supervisor, start_error},
report => [{supervisor, {local, tmp_sup}},
{errorContext, shutdown},
{reason, reached_max_restart_intensity},
logger:log(error, #{label => {supervisor, start_error},
report => [{supervisor, {local, tmp_sup}},
{errorContext, shutdown},
{reason, reached_max_restart_intensity},
{offender, [{pid, meck},
{id, meck},
{mfargs, {meck, start_link, []}},
{restart_type, permanent},
{shutdown, 5000},
{child_type, worker}]}]},
{child_type, worker}]}]},
#{logger_formatter => #{title => "SUPERVISOR REPORT"},
report_cb => fun logger:format_otp_report/1}),
?assertEqual(true, lists:keymember(supervisor_report, 1, emqx_alarm_handler:get_alarms())).
@ -142,4 +155,3 @@ raw_send_serialize(Packet, Opts) ->
raw_recv_parse(P, ProtoVersion) ->
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
version => ProtoVersion}}).

View File

@ -28,13 +28,8 @@ send_and_ack_test() ->
fun(Pid) -> Pid ! stop end),
meck:expect(emqx_client, publish, 2,
fun(Client, Msg) ->
case rand:uniform(200) of
1 ->
{error, {dummy, inflight_full}};
_ ->
Client ! {publish, Msg},
{ok, Msg} %% as packet id
end
Client ! {publish, Msg},
{ok, Msg} %% as packet id
end),
try
Max = 100,

View File

@ -72,7 +72,7 @@ publish(_) ->
ok = emqx:subscribe(<<"test/+">>),
timer:sleep(10),
emqx:publish(Msg),
?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
?assert(receive {dispatch, <<"test/+">>, #message{payload = <<"hello">>}} -> true after 5 -> false end).
dispatch_with_no_sub(_) ->
Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>),

View File

@ -32,8 +32,7 @@
<<"+/+">>, <<"TopicA/#">>]).
all() ->
[{group, mqttv4},
{group, mqttv5}].
[{group, mqttv4}].
groups() ->
[{mqttv4, [non_parallel_tests],
@ -44,10 +43,7 @@ groups() ->
%% keepalive_test,
redelivery_on_reconnect_test,
%% subscribe_failure_test,
dollar_topics_test]},
{mqttv5, [non_parallel_tests],
[request_response,
share_sub_request_topic]}].
dollar_topics_test]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
@ -56,89 +52,6 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
request_response_exception(QoS) ->
{ok, Client} = emqx_client:start_link([{proto_ver, v5},
{properties, #{ 'Request-Response-Information' => 0 }}]),
{ok, _} = emqx_client:connect(Client),
?assertError(no_response_information,
emqx_client:sub_request_topic(Client, QoS, <<"request_topic">>)),
ok = emqx_client:disconnect(Client).
request_response_per_qos(QoS) ->
{ok, Requester} = emqx_client:start_link([{proto_ver, v5},
{client_id, <<"requester">>},
{properties, #{ 'Request-Response-Information' => 1}}]),
{ok, _} = emqx_client:connect(Requester),
{ok, Responser} = emqx_client:start_link([{proto_ver, v5},
{client_id, <<"responser">>},
{properties, #{
'Request-Response-Information' => 1}},
{request_handler,
fun(_Req) -> <<"ResponseTest">> end}
]),
{ok, _} = emqx_client:connect(Responser),
ok = emqx_client:sub_request_topic(Responser, QoS, <<"request_topic">>),
{ok, <<"ResponseTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS),
ok = emqx_client:set_request_handler(Responser, fun(<<"request_payload">>) ->
<<"ResponseFunctionTest">>;
(_) ->
<<"404">>
end),
{ok, <<"ResponseFunctionTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS),
{ok, <<"404">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"invalid_request">>, QoS),
ok = emqx_client:disconnect(Responser),
ok = emqx_client:disconnect(Requester).
request_response(_Config) ->
request_response_per_qos(?QOS_2),
request_response_per_qos(?QOS_1),
request_response_per_qos(?QOS_0),
request_response_exception(?QOS_0),
request_response_exception(?QOS_1),
request_response_exception(?QOS_2).
share_sub_request_topic(_Config) ->
share_sub_request_topic_per_qos(?QOS_2),
share_sub_request_topic_per_qos(?QOS_1),
share_sub_request_topic_per_qos(?QOS_0).
share_sub_request_topic_per_qos(QoS) ->
application:set_env(?APPLICATION, shared_subscription_strategy, random),
ReqTopic = <<"request-topic">>,
RspTopic = <<"response-topic">>,
Group = <<"g1">>,
Properties = #{ 'Request-Response-Information' => 1},
Opts = fun(ClientId) -> [{proto_ver, v5},
{client_id, atom_to_binary(ClientId, utf8)},
{properties, Properties}
] end,
{ok, Requester} = emqx_client:start_link(Opts(requester)),
{ok, _} = emqx_client:connect(Requester),
{ok, Responser1} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]),
{ok, _} = emqx_client:connect(Responser1),
{ok, Responser2} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]),
{ok, _} = emqx_client:connect(Responser2),
ok = emqx_client:sub_request_topic(Responser1, QoS, ReqTopic, Group),
ok = emqx_client:sub_request_topic(Responser2, QoS, ReqTopic, Group),
%% Send a request, wait for response, validate response then return responser ID
ReqFun = fun(Req) ->
{ok, Rsp} = emqx_client:request(Requester, RspTopic, ReqTopic, Req, QoS),
case Rsp of
<<"1-", Req/binary>> -> 1;
<<"2-", Req/binary>> -> 2
end
end,
Ids = lists:map(fun(I) -> ReqFun(integer_to_binary(I)) end, lists:seq(1, 100)),
%% we are testing with random shared-dispatch strategy,
%% fail if not all responsers got a chance to handle requests
?assertEqual([1, 2], lists:usort(Ids)),
ok = emqx_client:disconnect(Responser1),
ok = emqx_client:disconnect(Responser2),
ok = emqx_client:disconnect(Requester).
receive_messages(Count) ->
receive_messages(Count, []).

View File

@ -21,7 +21,7 @@
-include_lib("common_test/include/ct.hrl").
all() ->
[add_delete_hook, run_hooks].
[add_delete_hook, run_hook].
add_delete_hook(_) ->
{ok, _} = emqx_hooks:start_link(),
@ -34,61 +34,82 @@ add_delete_hook(_) ->
?assertEqual(Callbacks, emqx_hooks:lookup(test_hook)),
ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1),
ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun2/1),
timer:sleep(1000),
timer:sleep(200),
?assertEqual([], emqx_hooks:lookup(test_hook)),
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 8),
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun1, []}, 9),
Callbacks2 = [{callback, {?MODULE, hook_fun1, []}, undefined, 9},
{callback, {?MODULE, hook_fun2, []}, undefined, 8}],
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun8, []}, 8),
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 2),
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun10, []}, 10),
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun9, []}, 9),
Callbacks2 = [{callback, {?MODULE, hook_fun10, []}, undefined, 10},
{callback, {?MODULE, hook_fun9, []}, undefined, 9},
{callback, {?MODULE, hook_fun8, []}, undefined, 8},
{callback, {?MODULE, hook_fun2, []}, undefined, 2}],
?assertEqual(Callbacks2, emqx_hooks:lookup(emqx_hook)),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun1, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2}),
timer:sleep(1000),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun8, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun9, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun10, []}),
timer:sleep(200),
?assertEqual([], emqx_hooks:lookup(emqx_hook)),
ok = emqx_hooks:stop().
run_hooks(_) ->
run_hook(_) ->
{ok, _} = emqx_hooks:start_link(),
ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]),
ok = emqx:hook(foldl_hook, {?MODULE, hook_fun3, [init]}),
ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]),
ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]),
{stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []),
{ok, []} = emqx:run_hooks(unknown_hook, [], []),
[r5,r4] = emqx:run_fold_hook(foldl_hook, [arg1, arg2], []),
[] = emqx:run_fold_hook(unknown_hook, [], []),
ok = emqx:hook(foldl_hook2, fun ?MODULE:hook_fun9/2),
ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}),
[r9] = emqx:run_fold_hook(foldl_hook2, [arg], []),
ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
{error, already_exists} = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
stop = emqx:run_hooks(foreach_hook, [arg]),
ok = emqx:run_hook(foreach_hook, [arg]),
ok = emqx:hook(foldl_hook2, fun ?MODULE:hook_fun9/2),
ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}),
{stop, []} = emqx:run_hooks(foldl_hook2, [arg], []),
ok = emqx:hook(foreach_filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0),
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg])), %% filter passed
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg1])), %% filter failed
ok = emqx:hook(filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0),
ok = emqx:run_hooks(filter1_hook, [arg]),
ok = emqx:hook(filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, []}),
{ok, []} = emqx:run_hooks(filter2_hook, [arg], []),
ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, [init_arg]}),
ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, {?MODULE, hook_filter2_1, [init_arg]}),
?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)),
?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)),
ok = emqx_hooks:stop().
hook_fun1([]) -> ok.
hook_fun2([]) -> {ok, []}.
hook_fun1(arg) -> ok;
hook_fun1(_) -> error.
hook_fun2(arg) -> ok;
hook_fun2(_) -> error.
hook_fun2(_, Acc) -> {ok, Acc + 1}.
hook_fun2_1(_, Acc) -> {ok, Acc + 1}.
hook_fun3(arg1, arg2, _Acc, init) -> ok.
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}.
hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}.
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r4 | Acc]}.
hook_fun5(arg1, arg2, Acc, init) -> {ok, [r5 | Acc]}.
hook_fun6(arg, initArg) -> ok.
hook_fun7(arg, initArg) -> any.
hook_fun8(arg, initArg) -> stop.
hook_fun7(arg, initArg) -> ok.
hook_fun8(arg, initArg) -> ok.
hook_fun9(arg, _Acc) -> any.
hook_fun10(arg, _Acc) -> stop.
hook_fun9(arg, Acc) -> {stop, [r9 | Acc]}.
hook_fun10(arg, Acc) -> {stop, [r10 | Acc]}.
hook_filter1(arg) -> true.
hook_filter2(arg, _Acc) -> true.
hook_filter1(arg) -> true;
hook_filter1(_) -> false.
hook_filter2(arg, _Acc, init_arg) -> true;
hook_filter2(_, _Acc, _IntArg) -> false.
hook_filter2_1(arg, _Acc, init_arg) -> true;
hook_filter2_1(arg1, _Acc, init_arg) -> true;
hook_filter2_1(_, _Acc, _IntArg) -> false.

View File

@ -47,12 +47,14 @@ timer_cancel_flush_test() ->
end.
shutdown_disabled_test() ->
ok = drain(),
self() ! foo,
?assertEqual(continue, conn_proc_mng_policy(0)),
receive foo -> ok end,
?assertEqual(hibernate, conn_proc_mng_policy(0)).
message_queue_too_long_test() ->
ok = drain(),
self() ! foo,
self() ! bar,
?assertEqual({shutdown, message_queue_too_long},
@ -63,3 +65,18 @@ message_queue_too_long_test() ->
conn_proc_mng_policy(L) ->
emqx_misc:conn_proc_mng_policy(#{message_queue_len => L}).
%% drain self() msg queue for deterministic test behavior
drain() ->
_ = drain([]), % maybe log
ok.
drain(Acc) ->
receive
Msg ->
drain([Msg | Acc])
after
0 ->
lists:reverse(Acc)
end.

View File

@ -31,67 +31,6 @@
username = <<"emqx">>,
password = <<"public">>})).
% -record(pstate, {
% zone,
% sendfun,
% peername,
% peercert,
% proto_ver,
% proto_name,
% client_id,
% is_assigned,
% conn_pid,
% conn_props,
% ack_props,
% username,
% session,
% clean_start,
% topic_aliases,
% packet_size,
% keepalive,
% mountpoint,
% is_super,
% is_bridge,
% enable_ban,
% enable_acl,
% acl_deny_action,
% recv_stats,
% send_stats,
% connected,
% connected_at,
% ignore_loop,
% topic_alias_maximum,
% conn_mod
% }).
% -define(TEST_PSTATE(ProtoVer, SendStats),
% #pstate{zone = test,
% sendfun = fun(_Packet, _Options) -> ok end,
% peername = test_peername,
% peercert = test_peercert,
% proto_ver = ProtoVer,
% proto_name = <<"MQTT">>,
% client_id = <<"test_pstate">>,
% is_assigned = false,
% conn_pid = self(),
% username = <<"emqx">>,
% is_super = false,
% clean_start = false,
% topic_aliases = #{},
% packet_size = 1000,
% mountpoint = <<>>,
% is_bridge = false,
% enable_ban = false,
% enable_acl = true,
% acl_deny_action = disconnect,
% recv_stats = #{msg => 0, pkt => 0},
% send_stats = SendStats,
% connected = false,
% ignore_loop = false,
% topic_alias_maximum = #{to_client => 0, from_client => 0},
% conn_mod = emqx_connection}).
all() ->
[
{group, mqtt_common},
@ -215,9 +154,10 @@ connect_v5(_) ->
#{'Request-Response-Information' => 1}})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
#{'Response-Information' := _RespInfo}), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5)
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5),
?assertNot(maps:is_key('Response-Information', Props)),
ok
end),
% topic alias = 0
@ -570,14 +510,6 @@ acl_deny_action_ct(_) ->
emqx_zone:set_env(external, acl_deny_action, ignore),
ok.
% acl_deny_action_eunit(_) ->
% PState = ?TEST_PSTATE(?MQTT_PROTO_V5, #{msg => 0, pkt => 0}),
% CodeName = emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ?MQTT_PROTO_V5),
% {error, CodeName, NEWPSTATE1} = emqx_protocol:process(?PUBLISH_PACKET(?QOS_1, <<"acl_deny_action">>, 1, <<"payload">>), PState),
% ?assertEqual(#{pkt => 1, msg => 0}, NEWPSTATE1#pstate.send_stats),
% {error, CodeName, NEWPSTATE2} = emqx_protocol:process(?PUBLISH_PACKET(?QOS_2, <<"acl_deny_action">>, 2, <<"payload">>), PState),
% ?assertEqual(#{pkt => 1, msg => 0}, NEWPSTATE2#pstate.send_stats).
will_topic_check(_) ->
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>},
{will_flag, true},
@ -614,6 +546,8 @@ acl_deny_do_disconnect(publish, QoS, Topic) ->
{ok, _} = emqx_client:connect(Client),
emqx_client:publish(Client, Topic, <<"test">>, QoS),
receive
{disconnected, shutdown, tcp_closed} ->
ct:pal(info, "[OK] after publish, client got disconnected: tcp_closed", []);
{'EXIT', Client, {shutdown,tcp_closed}} ->
ct:pal(info, "[OK] after publish, received exit: {shutdown,tcp_closed}"),
false = is_process_alive(Client);
@ -628,6 +562,8 @@ acl_deny_do_disconnect(subscribe, QoS, Topic) ->
{ok, _} = emqx_client:connect(Client),
{ok, _, [128]} = emqx_client:subscribe(Client, Topic, QoS),
receive
{disconnected, shutdown, tcp_closed} ->
ct:pal(info, "[OK] after subscribe, client got disconnected: tcp_closed", []);
{'EXIT', Client, {shutdown,tcp_closed}} ->
ct:pal(info, "[OK] after subscribe, received exit: {shutdown,tcp_closed}"),
false = is_process_alive(Client);

View File

@ -0,0 +1,102 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% @doc This module implements a request handler based on emqx_client.
%% A request handler is a MQTT client which subscribes to a request topic,
%% processes the requests then send response to another topic which is
%% subscribed by the request sender.
%% This code is in test directory because request and response are pure
%% client-side behaviours.
-module(emqx_request_handler).
-export([start_link/4, stop/1]).
-include("emqx_client.hrl").
-type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos().
-type topic() :: emqx_topic:topic().
-type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()).
-spec start_link(topic(), qos(), handler(), emqx_client:options()) ->
{ok, pid()} | {error, any()}.
start_link(RequestTopic, QoS, RequestHandler, Options0) ->
Parent = self(),
MsgHandler = make_msg_handler(RequestHandler, Parent),
Options = [{msg_handler, MsgHandler} | Options0],
case emqx_client:start_link(Options) of
{ok, Pid} ->
{ok, _} = emqx_client:connect(Pid),
try subscribe(Pid, RequestTopic, QoS) of
ok -> {ok, Pid};
{error, _} = Error -> Error
catch
C : E : S ->
emqx_client:stop(Pid),
{error, {C, E, S}}
end;
{error, _} = Error -> Error
end.
stop(Pid) ->
emqx_client:disconnect(Pid).
make_msg_handler(RequestHandler, Parent) ->
#{publish => fun(Msg) -> handle_msg(Msg, RequestHandler, Parent) end,
puback => fun(_Ack) -> ok end,
disconnected => fun(_Reason) -> ok end
}.
handle_msg(ReqMsg, RequestHandler, Parent) ->
#{qos := QoS, properties := Props, payload := ReqPayload} = ReqMsg,
case maps:find('Response-Topic', Props) of
{ok, RspTopic} when RspTopic =/= <<>> ->
CorrData = maps:get('Correlation-Data', Props),
RspProps = maps:without(['Response-Topic'], Props),
RspPayload = RequestHandler(CorrData, ReqPayload),
RspMsg = #mqtt_msg{qos = QoS,
topic = RspTopic,
props = RspProps,
payload = RspPayload
},
emqx_logger:debug("~p sending response msg to topic ~s with~n"
"corr-data=~p~npayload=~p",
[?MODULE, RspTopic, CorrData, RspPayload]),
ok = send_response(RspMsg);
_ ->
Parent ! {discarded, ReqPayload},
ok
end.
send_response(Msg) ->
%% This function is evaluated by emqx_client itself.
%% hence delegate to another temp process for the loopback gen_statem call.
Client = self(),
_ = spawn_link(fun() ->
case emqx_client:publish(Client, Msg) of
ok -> ok;
{ok, _} -> ok;
{error, Reason} -> exit({failed_to_publish_response, Reason})
end
end),
ok.
subscribe(Client, Topic, QoS) ->
{ok, _Props, _QoS} =
emqx_client:subscribe(Client, [{Topic, [{rh, 2}, {rap, false},
{nl, true}, {qos, QoS}]}]),
ok.

View File

@ -0,0 +1,68 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_request_response_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]).
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
all() ->
[request_response].
request_response(_Config) ->
request_response_per_qos(?QOS_0),
request_response_per_qos(?QOS_1),
request_response_per_qos(?QOS_2).
request_response_per_qos(QoS) ->
ReqTopic = <<"request_topic">>,
RspTopic = <<"response_topic">>,
{ok, Requester} = emqx_request_sender:start_link(RspTopic, QoS,
[{proto_ver, v5},
{client_id, <<"requester">>},
{properties, #{ 'Request-Response-Information' => 1}}]),
%% This is a square service
Square = fun(_CorrData, ReqBin) ->
I = b2i(ReqBin),
i2b(I * I)
end,
{ok, Responser} = emqx_request_handler:start_link(ReqTopic, QoS, Square,
[{proto_ver, v5},
{client_id, <<"responser">>}
]),
ok = emqx_request_sender:send(Requester, ReqTopic, RspTopic, <<"corr-1">>, <<"2">>, QoS),
receive
{response, <<"corr-1">>, <<"4">>} ->
ok;
Other ->
erlang:error({unexpected, Other})
after
100 ->
erlang:error(timeout)
end,
ok = emqx_request_sender:stop(Requester),
ok = emqx_request_handler:stop(Responser).
b2i(B) -> binary_to_integer(B).
i2b(I) -> integer_to_binary(I).

View File

@ -0,0 +1,82 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% @doc This module implements a request sender based on emqx_client.
%% A request sender is a MQTT client which sends messages to a request
%% topic, and subscribes to another topic for responses.
%% This code is in test directory because request and response are pure
%% client-side behaviours.
-module(emqx_request_sender).
-export([start_link/3, stop/1, send/6]).
-include("emqx_client.hrl").
start_link(ResponseTopic, QoS, Options0) ->
Parent = self(),
MsgHandler = make_msg_handler(Parent),
Options = [{msg_handler, MsgHandler} | Options0],
case emqx_client:start_link(Options) of
{ok, Pid} ->
{ok, _} = emqx_client:connect(Pid),
try subscribe(Pid, ResponseTopic, QoS) of
ok -> {ok, Pid};
{error, _} = Error -> Error
catch
C : E : S ->
emqx_client:stop(Pid),
{error, {C, E, S}}
end;
{error, _} = Error -> Error
end.
%% @doc Send a message to request topic with correlation-data `CorrData'.
%% Response should be delivered as a `{response, CorrData, Payload}'
send(Client, ReqTopic, RspTopic, CorrData, Payload, QoS) ->
Props = #{'Response-Topic' => RspTopic,
'Correlation-Data' => CorrData
},
Msg = #mqtt_msg{qos = QoS,
topic = ReqTopic,
props = Props,
payload = Payload
},
case emqx_client:publish(Client, Msg) of
ok -> ok; %% QoS = 0
{ok, _} -> ok;
{error, _} = E -> E
end.
stop(Pid) ->
emqx_client:disconnect(Pid).
subscribe(Client, Topic, QoS) ->
case emqx_client:subscribe(Client, Topic, QoS) of
{ok, _, _} -> ok;
{error, _} = Error -> Error
end.
make_msg_handler(Parent) ->
#{publish => fun(Msg) -> handle_msg(Msg, Parent) end,
puback => fun(_Ack) -> ok end,
disconnected => fun(_Reason) -> ok end
}.
handle_msg(Msg, Parent) ->
#{properties := Props, payload := Payload} = Msg,
CorrData = maps:get('Correlation-Data', Props),
Parent ! {response, CorrData, Payload},
ok.

36
test/emqx_rpc_SUITE.erl Normal file
View File

@ -0,0 +1,36 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_rpc_SUITE).
-include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-compile(nowarn_export_all).
-define(MASTER, 'emqxct@127.0.0.1').
all() -> [t_rpc].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_rpc(_) ->
60000 = emqx_rpc:call(?MASTER, timer, seconds, [60]),
{badrpc, _} = emqx_rpc:call(?MASTER, os, test, []),
{_, []} = emqx_rpc:multicall([?MASTER, ?MASTER], os, timestamp, []).

View File

@ -20,8 +20,17 @@
-compile(export_all).
-compile(nowarn_export_all).
-import(emqx_topic, [wildcard/1, match/2, validate/1, triples/1, join/1,
words/1, systop/1, feed_var/3, parse/1]).
-import(emqx_topic,
[wildcard/1,
match/2,
validate/1,
triples/1,
join/1,
words/1,
systop/1,
feed_var/3,
parse/1
]).
-define(N, 10000).
@ -32,6 +41,7 @@ all() ->
t_triples,
t_join,
t_levels,
t_tokens,
t_words,
t_systop,
t_feed_var,
@ -165,6 +175,9 @@ t_triples_perf(_) ->
t_levels(_) ->
?assertEqual(4, emqx_topic:levels(<<"a/b/c/d">>)).
t_tokens(_) ->
?assertEqual([<<"a">>, <<"b">>, <<"+">>, <<"#">>], emqx_topic:tokens(<<"a/b/+/#">>)).
t_words(_) ->
['', <<"a">>, '+', '#'] = words(<<"/a/+/#">>),
['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'] = words(<<"/abkc/19383/+/akakdkkdkak/#">>),
@ -193,9 +206,12 @@ t_systop(_) ->
?assertEqual(SysTop2,systop(<<"abc">>)).
t_feed_var(_) ->
?assertEqual(<<"$queue/client/clientId">>, feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)),
?assertEqual(<<"username/test/client/x">>, feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>)),
?assertEqual(<<"username/test/client/clientId">>, feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>)).
?assertEqual(<<"$queue/client/clientId">>,
feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)),
?assertEqual(<<"username/test/client/x">>,
feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>)),
?assertEqual(<<"username/test/client/clientId">>,
feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>)).
long_topic() ->
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]).
@ -208,3 +224,4 @@ t_parse(_) ->
?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),
?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).