diff --git a/etc/emqx.conf b/etc/emqx.conf index 5361ab7da..cb2d0bb00 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -432,12 +432,6 @@ acl_deny_action = ignore ## MQTT Protocol ##-------------------------------------------------------------------- -## Response Topic Prefix -## -## Value: String -## Default: emqxrspv1 -mqtt.response_topic_prefix = emqxrspv1 - ## Maximum MQTT packet size allowed. ## ## Value: Bytes @@ -779,16 +773,6 @@ listener.tcp.external.active_n = 100 ## Value: String listener.tcp.external.zone = external -## Mountpoint of the MQTT/TCP Listener. All the topics will be prefixed -## with the mountpoint path if this option is enabled. -## -## Variables in mountpoint path: -## - %c: clientid -## - %u: username -## -## Value: String -## listener.tcp.external.mountpoint = devicebound/ - ## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'. ## ## Value: rate,burst @@ -918,13 +902,6 @@ listener.tcp.internal.active_n = 1000 ## Value: String listener.tcp.internal.zone = internal -## Mountpoint of the MQTT/TCP Listener. -## -## See: listener.tcp.$name.mountpoint -## -## Value: String -## listener.tcp.internal.mountpoint = internal/ - ## Rate limit for the internal MQTT/TCP connections. ## ## See: listener.tcp.$name.rate_limit @@ -1030,11 +1007,6 @@ listener.ssl.external.active_n = 100 ## Value: String listener.ssl.external.zone = external -## Mountpoint of the MQTT/SSL Listener. -## -## Value: String -## listener.ssl.external.mountpoint = devicebound/ - ## The access control rules for the MQTT/SSL listener. ## ## See: listener.tcp.$name.access @@ -1147,6 +1119,12 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## Value: Ciphers listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA +## Ciphers for TLS PSK. +## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot +## be configured at the same time. +## See 'https://tools.ietf.org/html/rfc4279#section-2'. +#listener.ssl.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA + ## SSL parameter renegotiation is a feature that allows a client and a server ## to renegotiate the parameters of the SSL connection on the fly. ## RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation, @@ -1281,13 +1259,6 @@ listener.ws.external.max_conn_rate = 1000 ## Value: String listener.ws.external.zone = external -## Mountpoint of the MQTT/WebSocket Listener. -## -## See: listener.tcp.$name.mountpoint -## -## Value: String -## listener.ws.external.mountpoint = devicebound/ - ## The access control for the MQTT/WebSocket listener. ## ## See: listener.tcp.$name.access @@ -1427,13 +1398,6 @@ listener.wss.external.max_conn_rate = 1000 ## Value: String listener.wss.external.zone = external -## Mountpoint of the MQTT/WebSocket/SSL Listener. -## -## See: listener.tcp.$name.mountpoint -## -## Value: String -## listener.wss.external.mountpoint = devicebound/ - ## The access control rules for the MQTT/WebSocket/SSL listener. ## ## See: listener.tcp.$name.access. @@ -1516,7 +1480,13 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## See: listener.ssl.$name.ciphers ## ## Value: Ciphers -## listener.wss.external.ciphers = +listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA + +## Ciphers for TLS PSK. +## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot +## be configured at the same time. +## See 'https://tools.ietf.org/html/rfc4279#section-2'. +#listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA ## See: listener.ssl.$name.secure_renegotiate ## @@ -1587,8 +1557,6 @@ listener.wss.external.send_timeout_close = on ## Value: true | false ## listener.wss.external.nodelay = true -listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA - ##-------------------------------------------------------------------- ## Bridges ##-------------------------------------------------------------------- @@ -1669,7 +1637,13 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## SSL Ciphers used by the bridge. ## ## Value: String -## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 +#bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 + +## Ciphers for TLS PSK. +## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot +## be configured at the same time. +## See 'https://tools.ietf.org/html/rfc4279#section-2'. +#bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA ## Ping interval of a down bridge. ## @@ -1829,6 +1803,12 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: String ## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 +## Ciphers for TLS PSK. +## Note that 'bridge.*.ciphers' and 'bridge.*.psk_ciphers' cannot +## be configured at the same time. +## See 'https://tools.ietf.org/html/rfc4279#section-2'. +#bridge.azure.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA + ## Ping interval of a down bridge. ## ## Value: Duration diff --git a/include/emqx_client.hrl b/include/emqx_client.hrl index 535b8ad55..bf2f49283 100644 --- a/include/emqx_client.hrl +++ b/include/emqx_client.hrl @@ -15,6 +15,7 @@ -ifndef(EMQX_CLIENT_HRL). -define(EMQX_CLIENT_HRL, true). +-include("emqx_mqtt.hrl"). -record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, packet_id, topic, props, payload}). -endif. diff --git a/priv/emqx.schema b/priv/emqx.schema index 5161d5e94..caa0a2258 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -563,11 +563,6 @@ end}. %% MQTT Protocol %%-------------------------------------------------------------------- -%% @doc Response Topic Prefix -{mapping, "mqtt.response_topic_prefix", "emqx.response_topic_prefix",[ - {datatype, string} -]}. - %% @doc Max Packet Size Allowed, 1MB by default. {mapping, "mqtt.max_packet_size", "emqx.max_packet_size", [ {default, "1MB"}, @@ -866,15 +861,16 @@ end}. {force_shutdown_policy, ShutdownPolicy}; ("mqueue_priorities", Val) -> case Val of - "none" -> none; % NO_PRIORITY_TABLE + "none" -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE _ -> - lists:foldl(fun(T, Acc) -> + MqueuePriorities = lists:foldl(fun(T, Acc) -> %% NOTE: space in "= " is intended - [{Topic, Prio}] = string:tokens(T, "= "), + [Topic, Prio] = string:tokens(T, "= "), P = list_to_integer(Prio), (P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}), maps:put(iolist_to_binary(Topic), P, Acc) - end, string:tokens(Val, ",")) + end, #{}, string:tokens(Val, ",")), + {mqueue_priorities, MqueuePriorities} end; ("mountpoint", Val) -> {mountpoint, iolist_to_binary(Val)}; @@ -924,10 +920,6 @@ end}. {datatype, string} ]}. -{mapping, "listener.tcp.$name.mountpoint", "emqx.listeners", [ - {datatype, string} -]}. - {mapping, "listener.tcp.$name.rate_limit", "emqx.listeners", [ {default, undefined}, {datatype, string} @@ -1024,10 +1016,6 @@ end}. {datatype, string} ]}. -{mapping, "listener.ssl.$name.mountpoint", "emqx.listeners", [ - {datatype, string} -]}. - {mapping, "listener.ssl.$name.rate_limit", "emqx.listeners", [ {default, undefined}, {datatype, string} @@ -1098,6 +1086,10 @@ end}. {datatype, string} ]}. +{mapping, "listener.ssl.$name.psk_ciphers", "emqx.listeners", [ + {datatype, string} +]}. + {mapping, "listener.ssl.$name.handshake_timeout", "emqx.listeners", [ {default, "15s"}, {datatype, {duration, ms}} @@ -1174,10 +1166,6 @@ end}. {datatype, string} ]}. -{mapping, "listener.ws.$name.mountpoint", "emqx.listeners", [ - {datatype, string} -]}. - {mapping, "listener.ws.$name.rate_limit", "emqx.listeners", [ {default, undefined}, {datatype, string} @@ -1280,10 +1268,6 @@ end}. {datatype, string} ]}. -{mapping, "listener.wss.$name.mountpoint", "emqx.listeners", [ - {datatype, string} -]}. - {mapping, "listener.wss.$name.rate_limit", "emqx.listeners", [ {datatype, string} ]}. @@ -1368,6 +1352,10 @@ end}. {datatype, string} ]}. +{mapping, "listener.wss.$name.psk_ciphers", "emqx.listeners", [ + {datatype, string} +]}. + {mapping, "listener.wss.$name.keyfile", "emqx.listeners", [ {datatype, string} ]}. @@ -1423,8 +1411,6 @@ end}. end end, - MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end, - Ratelimit = fun(undefined) -> undefined; (S) -> @@ -1442,7 +1428,6 @@ end}. {rate_limit, Ratelimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))}, {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, - {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))}, {verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)}, {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)}, @@ -1460,14 +1445,40 @@ end}. end, SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, - + MapPSKCiphers = fun(PSKCiphers) -> + lists:map( + fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; + ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; + ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; + ("PSK-RC4-SHA") -> {psk, rc4_128, sha} + end, PSKCiphers) + end, SslOpts = fun(Prefix) -> - Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of + Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of undefined -> undefined; L -> [list_to_atom(V) || V <- L] end, + TLSCiphers = cuttlefish:conf_get(Prefix++".ciphers", Conf, undefined), + PSKCiphers = cuttlefish:conf_get(Prefix++".psk_ciphers", Conf, undefined), + Ciphers = + case {TLSCiphers, PSKCiphers} of + {undefined, undefined} -> + cuttlefish:invalid(Prefix++".ciphers or "++Prefix++".psk_ciphers is absent"); + {TLSCiphers, undefined} -> + SplitFun(TLSCiphers); + {undefined, PSKCiphers} -> + MapPSKCiphers(SplitFun(PSKCiphers)); + {_TLSCiphers, _PSKCiphers} -> + cuttlefish:invalid(Prefix++".ciphers and "++Prefix++".psk_ciphers cannot be configured at the same time") + end, + UserLookupFun = + case PSKCiphers of + undefined -> undefined; + _ -> {fun emqx_psk:lookup/3, <<>>} + end, Filter([{versions, Versions}, - {ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))}, + {ciphers, Ciphers}, + {user_lookup_fun, UserLookupFun}, {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, @@ -1567,6 +1578,10 @@ end}. {datatype, string} ]}. +{mapping, "bridge.$name.psk_ciphers", "emqx.bridges", [ + {datatype, string} +]}. + {mapping, "bridge.$name.keepalive", "emqx.bridges", [ {default, "10s"}, {datatype, {duration, ms}} @@ -1622,22 +1637,34 @@ end}. ]}. {translation, "emqx.bridges", fun(Conf) -> + MapPSKCiphers = fun(PSKCiphers) -> + lists:map( + fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; + ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; + ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; + ("PSK-RC4-SHA") -> {psk, rc4_128, sha} + end, PSKCiphers) + end, + Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, IsSsl = fun(cacertfile) -> true; (certfile) -> true; (keyfile) -> true; (ciphers) -> true; + (psk_ciphers) -> true; (tls_versions) -> true; (_Opt) -> false end, Parse = fun(tls_versions, Vers) -> - {versions, [list_to_atom(S) || S <- Split(Vers)]}; + [{versions, [list_to_atom(S) || S <- Split(Vers)]}]; (ciphers, Ciphers) -> - {ciphers, Split(Ciphers)}; + [{ciphers, Split(Ciphers)}]; + (psk_ciphers, Ciphers) -> + [{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}]; (Opt, Val) -> - {Opt, Val} + [{Opt, Val}] end, Merge = fun(forwards, Val, Opts) -> @@ -1645,7 +1672,7 @@ end}. (Opt, Val, Opts) -> case IsSsl(Opt) of true -> - SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])], + SslOpts = Parse(Opt, Val) ++ [proplists:get_value(ssl_opts, Opts, [])], lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts)); false -> [{Opt, Val}|Opts] diff --git a/rebar.config b/rebar.config index 745c22f68..6fefc5bfb 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,7 @@ {deps, [{jsx, "2.9.0"}, {gproc, "0.8.0"}, - {cowboy, "2.4.0"} + {cowboy, "2.4.0"}, + {meck, "0.8.13"} %% temp workaround for version check ]}. %% appended to deps in rebar.config.script @@ -28,4 +29,3 @@ {plugins, [coveralls]}. -{profiles, [{test, [{deps, [{meck, "0.8.13"}]}]}]}. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 2275164a9..4ec28d2d9 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -17,12 +17,9 @@ -behaviour(gen_statem). -include("types.hrl"). --include("emqx_mqtt.hrl"). -include("emqx_client.hrl"). -export([start_link/0, start_link/1]). --export([request/5, request/6, request_async/7, receive_response/3]). --export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]). -export([connect/1]). -export([subscribe/2, subscribe/3, subscribe/4]). -export([publish/2, publish/3, publish/4, publish/5]). @@ -41,9 +38,7 @@ -export([initialized/3, waiting_for_connack/3, connected/3, inflight_full/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). --export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, - request_input/0, response_payload/0, request_handler/0, - corr_data/0, mqtt_msg/0]). +-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, mqtt_msg/0]). -export_type([host/0, option/0]). @@ -57,24 +52,12 @@ -define(WILL_MSG(QoS, Retain, Topic, Props, Payload), #mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}). --define(RESPONSE_TIMEOUT_SECONDS, timer:seconds(5)). - --define(NO_REQ_HANDLER, undefined). - --define(NO_GROUP, <<>>). - -define(NO_CLIENT_ID, <<>>). -type(host() :: inet:ip_address() | inet:hostname()). --type(corr_data() :: binary()). - -%% NOTE: Message handler is different from request handler. -%% Message handler is a set of callbacks defined to handle MQTT messages as well as -%% the disconnect event. -%% Request handler is a callback to handle received MQTT message as in 'request', -%% and publish another MQTT message back to the defined topic as in 'response'. -%% `owner' and `msg_handler' has no effect when `request_handler' is set. +%% Message handler is a set of callbacks defined to handle MQTT messages +%% as well as the disconnect event. -define(NO_MSG_HDLR, undefined). -type(msg_handler() :: #{puback := fun((_) -> any()), publish := fun((emqx_types:message()) -> any()), @@ -100,7 +83,6 @@ | {keepalive, non_neg_integer()} | {max_inflight, pos_integer()} | {retry_interval, timeout()} - | {request_handler, request_handler()} | {will_topic, iodata()} | {will_payload, iodata()} | {will_retain, boolean()} @@ -146,7 +128,6 @@ ack_timer :: reference(), retry_interval :: pos_integer(), retry_timer :: reference(), - request_handler :: request_handler(), session_present :: boolean(), last_packet_id :: packet_id(), parse_state :: emqx_frame:state()}). @@ -176,35 +157,10 @@ -type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}). --type(request_input() :: binary()). - --type(response_payload() :: binary()). - --type(request_handler() :: fun((request_input()) -> response_payload())). - --type(group() :: binary()). - %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ -%% @doc Swap in a new request handler on the fly. --spec(set_request_handler(client(), request_handler()) -> ok). -set_request_handler(Responser, RequestHandler) -> - gen_statem:call(Responser, {set_request_handler, RequestHandler}). - -%% @doc Subscribe to request topic. --spec(sub_request_topic(client(), qos(), topic()) -> ok). -sub_request_topic(Client, QoS, Topic) -> - sub_request_topic(Client, QoS, Topic, ?NO_GROUP). - -%% @doc Share-subscribe to request topic. --spec(sub_request_topic(client(), qos(), topic(), group()) -> ok). -sub_request_topic(Client, QoS, Topic, Group) -> - Properties = get_properties(Client), - NewTopic = make_req_rsp_topic(Properties, Topic, Group), - subscribe_req_rsp_topic(Client, QoS, NewTopic). - -spec(start_link() -> gen_statem:start_ret()). start_link() -> start_link([]). @@ -293,76 +249,6 @@ parse_subopt([{nl, false} | Opts], Result) -> parse_subopt([{qos, QoS} | Opts], Result) -> parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}). --spec(request(client(), topic(), topic(), payload(), qos() | [pubopt()]) - -> ok | {ok, packet_id()} | {error, term()}). -request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), is_atom(QoS) -> - request(Client, ResponseTopic, RequestTopic, Payload, [{qos, ?QOS_I(QoS)}]); -request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), ?IS_QOS(QoS) -> - request(Client, ResponseTopic, RequestTopic, Payload, [{qos, QoS}]); -request(Client, ResponseTopic, RequestTopic, Payload, Opts) when is_binary(ResponseTopic), is_list(Opts) -> - request(Client, ResponseTopic, RequestTopic, Payload, Opts, _Properties = #{}). - -%% @doc Send a request to request topic and wait for response. --spec(request(client(), topic(), topic(), payload(), [pubopt()], properties()) - -> {ok, response_payload()} | {error, term()}). -request(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties) -> - CorrData = make_corr_data(), - case request_async(Client, ResponseTopic, RequestTopic, - Payload, Opts, Properties, CorrData) of - ok -> receive_response(Client, CorrData, Opts); - {error, Reason} -> {error, Reason} - end. - -%% @doc Get client properties. --spec(get_properties(client()) -> properties()). -get_properties(Client) -> gen_statem:call(Client, get_properties, infinity). - -%% @doc Send a request, but do not wait for response. -%% The caller should expect a `{publish, Response}' message, -%% or call `receive_response/3' to receive the message. --spec(request_async(client(), topic(), topic(), payload(), - [pubopt()], properties(), corr_data()) -> ok | {error, any()}). -request_async(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties, CorrData) - when is_binary(ResponseTopic), - is_binary(RequestTopic), - is_map(Properties), - is_list(Opts) -> - ok = emqx_mqtt_props:validate(Properties), - Retain = proplists:get_bool(retain, Opts), - QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)), - ClientProperties = get_properties(Client), - NewResponseTopic = make_req_rsp_topic(ClientProperties, ResponseTopic), - NewRequestTopic = make_req_rsp_topic(ClientProperties, RequestTopic), - %% This is perhaps not optimal to subscribe the response topic for - %% each and every request even though the response topic is always the same - ok = sub_response_topic(Client, QoS, NewResponseTopic), - NewProperties = maps:merge(Properties, #{'Response-Topic' => NewResponseTopic, - 'Correlation-Data' => CorrData}), - case publish(Client, #mqtt_msg{qos = QoS, - retain = Retain, - topic = NewRequestTopic, - props = NewProperties, - payload = iolist_to_binary(Payload)}) of - ok -> ok; - {ok, _PacketId} -> ok; %% assume auto_ack - {error, Reason} -> {error, Reason} - end. - -%% @doc Block wait the response for a request sent earlier. --spec(receive_response(client(), corr_data(), [pubopt()]) - -> {ok, response_payload()} | {error, any()}). -receive_response(Client, CorrData, Opts) -> - TimeOut = proplists:get_value(timeout, Opts, ?RESPONSE_TIMEOUT_SECONDS), - MRef = erlang:monitor(process, Client), - TRef = erlang:start_timer(TimeOut, self(), response), - try - receive_response(Client, CorrData, TRef, MRef) - after - erlang:cancel_timer(TRef), - receive {timeout, TRef, _} -> ok after 0 -> ok end, - erlang:demonitor(MRef, [flush]) - end. - -spec(publish(client(), topic(), payload()) -> ok | {error, term()}). publish(Client, Topic, Payload) when is_binary(Topic) -> publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}). @@ -511,7 +397,6 @@ init([Options]) -> auto_ack = true, ack_timeout = ?DEFAULT_ACK_TIMEOUT, retry_interval = 0, - request_handler = ?NO_REQ_HANDLER, connect_timeout = ?DEFAULT_CONNECT_TIMEOUT, last_packet_id = 1}), {ok, initialized, init_parse_state(State)}. @@ -616,8 +501,6 @@ init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) -> init(Opts, State#state{auto_ack = AutoAck}); init([{retry_interval, I} | Opts], State) -> init(Opts, State#state{retry_interval = timer:seconds(I)}); -init([{request_handler, Handler} | Opts], State) -> - init(Opts, State#state{request_handler = Handler}); init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) -> init(Opts, State#state{bridge_mode = Mode}); init([_Opt | Opts], State) -> @@ -756,15 +639,9 @@ connected({call, From}, pause, State) -> connected({call, From}, resume, State) -> {keep_state, State#state{paused = false}, [{reply, From, ok}]}; -connected({call, From}, get_properties, #state{properties = Properties}) -> - {keep_state_and_data, [{reply, From, Properties}]}; - connected({call, From}, client_id, #state{client_id = ClientId}) -> {keep_state_and_data, [{reply, From, ClientId}]}; -connected({call, From}, {set_request_handler, RequestHandler}, State) -> - {keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]}; - connected({call, From}, SubReq = {subscribe, Properties, Topics}, State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) -> case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of @@ -846,25 +723,12 @@ connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) -> connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) -> keep_state_and_data; -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload), - State) when Properties =/= undefined -> - NewState = response_publish(Properties, State, ?QOS_0, Payload), - {keep_state, deliver(packet_to_msg(Packet), NewState)}; connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) -> {keep_state, deliver(packet_to_msg(Packet), State)}; -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, Properties, Payload), State) - when Properties =/= undefined -> - NewState = response_publish(Properties, State, ?QOS_1, Payload), - publish_process(?QOS_1, Packet, NewState); - connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> publish_process(?QOS_1, Packet, State); -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, Properties, Payload), State) - when Properties =/= undefined -> - NewState = response_publish(Properties, State, ?QOS_2, Payload), - publish_process(?QOS_2, Packet, NewState); connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> publish_process(?QOS_2, Packet, State); @@ -1086,57 +950,6 @@ delete_inflight_when_full(Packet, State0) -> false -> {next_state, connected, State} end. -%% Subscribe to response topic. --spec(sub_response_topic(client(), qos(), topic()) -> ok). -sub_response_topic(Client, QoS, Topic) when is_binary(Topic) -> - subscribe_req_rsp_topic(Client, QoS, Topic). - -receive_response(Client, CorrData, TRef, MRef) -> - receive - {publish, Response} -> - {ok, Properties} = maps:find(properties, Response), - case maps:find('Correlation-Data', Properties) of - {ok, CorrData} -> - maps:find(payload, Response); - _ -> - emqx_logger:debug("Discarded stale response: ~p", [Response]), - receive_response(Client, CorrData, TRef, MRef) - end; - {timeout, TRef, response} -> - {error, timeout}; - {'DOWN', MRef, process, _, _} -> - {error, client_down} - end. - -%% Make a unique correlation data for each request. -%% It has to be unique because stale responses should be discarded. -make_corr_data() -> term_to_binary(make_ref()). - -%% Shared function for request and response topic subscription. -subscribe_req_rsp_topic(Client, QoS, Topic) -> - %% It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription - {ok, _Props, _QoS} = subscribe(Client, [{Topic, [{rh, 2}, {rap, false}, - {nl, not ?IS_SHARE(Topic)}, - {qos, QoS}]}]), - emqx_logger:debug("Subscribed to topic ~s", [Topic]), - ok. - -%% Make a request or response topic. -make_req_rsp_topic(Properties, Topic) -> - make_req_rsp_topic(Properties, Topic, ?NO_GROUP). - -%% Same as make_req_rsp_topic/2, but allow shared subscription (for request topics) -make_req_rsp_topic(Properties, Topic, Group) -> - case maps:find('Response-Information', Properties) of - {ok, ResponseInformation} when ResponseInformation =/= <<>> -> - emqx_topic:join([req_rsp_topic_prefix(Group, ResponseInformation), Topic]); - _ -> - erlang:error(no_response_information) - end. - -req_rsp_topic_prefix(?NO_GROUP, Prefix) -> Prefix; -req_rsp_topic_prefix(Group, Prefix) -> ?SHARE(Group, Prefix). - assign_id(?NO_CLIENT_ID, Props) -> case maps:find('Assigned-Client-Identifier', Props) of {ok, Value} -> @@ -1162,49 +975,6 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), Stop -> Stop end. -response_publish(#{'Response-Topic' := ResponseTopic} = Properties, - State = #state{request_handler = RequestHandler}, QoS, Payload) - when RequestHandler =/= ?NO_REQ_HANDLER -> - do_publish(ResponseTopic, Properties, State, QoS, Payload); -response_publish(_Properties, State, _QoS, _Payload) -> State. - -do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler}, ?QOS_0, Payload) -> - Msg = #mqtt_msg{qos = ?QOS_0, - retain = false, - topic = ResponseTopic, - props = Properties, - payload = RequestHandler(Payload) - }, - case send(Msg, State) of - {ok, NewState} -> NewState; - _Error -> State - end; -do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler, - inflight = Inflight, - last_packet_id = PacketId}, - QoS, Payload) - when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2)-> - case emqx_inflight:is_full(Inflight) of - true -> - emqx_logger:error("Inflight is full"), - State; - false -> - Msg = #mqtt_msg{packet_id = PacketId, - qos = QoS, - retain = false, - topic = ResponseTopic, - props = Properties, - payload = RequestHandler(Payload)}, - case send(Msg, State) of - {ok, NewState} -> - Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight), - ensure_retry_timer(NewState#state{inflight = Inflight1}); - {error, Reason} -> - emqx_logger:error("Send failed: ~p", [Reason]), - State - end - end. - ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) -> ensure_keepalive_timer(timer:seconds(Secs), State); ensure_keepalive_timer(State = #state{keepalive = 0}) -> @@ -1291,9 +1061,6 @@ retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) -> Error end. -deliver(_Msg, State = #state{request_handler = Hdlr}) when Hdlr =/= ?NO_REQ_HANDLER -> - %% message has been terminated by request handler, hence should not continue processing - State; deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, topic = Topic, props = Props, payload = Payload}, State) -> @@ -1303,17 +1070,17 @@ deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, ok = eval_msg_handler(State, publish, Msg), State. -eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, +eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR, owner = Owner}, disconnected, {ReasonCode, Properties}) -> %% Special handling for disconnected message when there is no handler callback Owner ! {disconnected, ReasonCode, Properties}, ok; -eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER}, +eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR}, disconnected, _OtherReason) -> %% do nothing to be backward compatible ok; -eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, +eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR, owner = Owner}, Kind, Msg) -> Owner ! {Kind, Msg}, ok; diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index b3ab3aa74..42989ec0c 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -28,6 +28,14 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% Multiple callbacks can be registered on a hookpoint. +%% The execution order depends on the priority value: +%% - Callbacks with greater priority values will be run before +%% the ones with lower priority values. e.g. A Callback with +%% priority = 2 precedes the callback with priority = 1. +%% - The execution order is the adding order of callbacks if they have +%% equal priority values. + -type(hookpoint() :: atom()). -type(action() :: function() | mfa()). -type(filter() :: function() | mfa()). @@ -56,14 +64,14 @@ stop() -> %%------------------------------------------------------------------------------ %% @doc Register a callback --spec(add(hookpoint(), action() | #callback{}) -> emqx_types:ok_or_error(already_exists)). +-spec(add(hookpoint(), action() | #callback{}) -> ok_or_error(already_exists)). add(HookPoint, Callback) when is_record(Callback, callback) -> gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity); add(HookPoint, Action) when is_function(Action); is_tuple(Action) -> add(HookPoint, #callback{action = Action, priority = 0}). -spec(add(hookpoint(), action(), filter() | integer() | list()) - -> emqx_types:ok_or_error(already_exists)). + -> ok_or_error(already_exists)). add(HookPoint, Action, InitArgs) when is_function(Action), is_list(InitArgs) -> add(HookPoint, #callback{action = {Action, InitArgs}, priority = 0}); add(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) -> @@ -72,8 +80,8 @@ add(HookPoint, Action, Priority) when is_integer(Priority) -> add(HookPoint, #callback{action = Action, priority = Priority}). -spec(add(hookpoint(), action(), filter(), integer()) - -> emqx_types:ok_or_error(already_exists)). -add(HookPoint, Action, Filter, Priority) -> + -> ok_or_error(already_exists)). +add(HookPoint, Action, Filter, Priority) when is_integer(Priority) -> add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). %% @doc Unregister a callback. @@ -87,17 +95,17 @@ run(HookPoint, Args) -> run_(lookup(HookPoint), Args). %% @doc Run hooks with Accumulator. --spec(run(atom(), list(Arg :: any()), any()) -> any()). +-spec(run(atom(), list(Arg::any()), Acc::any()) -> {ok, Acc::any()} | {stop, Acc::any()}). run(HookPoint, Args, Acc) -> run_(lookup(HookPoint), Args, Acc). %% @private run_([#callback{action = Action, filter = Filter} | Callbacks], Args) -> - case filtered(Filter, Args) orelse execute(Action, Args) of - true -> run_(Callbacks, Args); - ok -> run_(Callbacks, Args); - stop -> stop; - _Any -> run_(Callbacks, Args) + case filter_passed(Filter, Args) andalso execute(Action, Args) of + false -> run_(Callbacks, Args); + ok -> run_(Callbacks, Args); + stop -> stop; + _Any -> run_(Callbacks, Args) end; run_([], _Args) -> ok. @@ -105,8 +113,8 @@ run_([], _Args) -> %% @private run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) -> Args1 = Args ++ [Acc], - case filtered(Filter, Args1) orelse execute(Action, Args1) of - true -> run_(Callbacks, Args, Acc); + case filter_passed(Filter, Args1) andalso execute(Action, Args1) of + false -> run_(Callbacks, Args, Acc); ok -> run_(Callbacks, Args, Acc); {ok, NewAcc} -> run_(Callbacks, Args, NewAcc); stop -> {stop, Acc}; @@ -116,13 +124,14 @@ run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) -> run_([], _Args, Acc) -> {ok, Acc}. -filtered(undefined, _Args) -> - false; -filtered(Filter, Args) -> +-spec(filter_passed(filter(), Args::term()) -> true | false). +filter_passed(undefined, _Args) -> true; +filter_passed(Filter, Args) -> execute(Filter, Args). -execute(Action, Args) when is_function(Action) -> - erlang:apply(Action, Args); +%% @doc execute a function. +execute(Fun, Args) when is_function(Fun) -> + erlang:apply(Fun, Args); execute({Fun, InitArgs}, Args) when is_function(Fun) -> erlang:apply(Fun, Args ++ InitArgs); execute({M, F, A}, Args) -> @@ -142,11 +151,11 @@ lookup(HookPoint) -> %%------------------------------------------------------------------------------ init([]) -> - ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), + ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}, protected]), {ok, #{}}. handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) -> - Reply = case lists:keymember(Action, 2, Callbacks = lookup(HookPoint)) of + Reply = case lists:keymember(Action, #callback.action, Callbacks = lookup(HookPoint)) of true -> {error, already_exists}; false -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9618a351e..96aae9c56 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -607,27 +607,31 @@ deliver({connack, ReasonCode}, PState) -> deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, proto_ver = ?MQTT_PROTO_V5, client_id = ClientId, - conn_props = ConnProps, is_assigned = IsAssigned, topic_alias_maximum = TopicAliasMaximum}) -> - ResponseInformation = case maps:find('Request-Response-Information', ConnProps) of - {ok, 1} -> - iolist_to_binary(emqx_config:get_env(response_topic_prefix)); - _ -> - <<>> - end, #{max_packet_size := MaxPktSize, max_qos_allowed := MaxQoS, mqtt_retain_available := Retain, max_topic_alias := MaxAlias, mqtt_shared_subscription := Shared, mqtt_wildcard_subscription := Wildcard} = caps(PState), + %% Response-Information is so far not set by broker. + %% i.e. It's a Client-to-Client contract for the request-response topic naming scheme. + %% According to MQTT 5.0 spec: + %% A common use of this is to pass a globally unique portion of the topic tree which + %% is reserved for this Client for at least the lifetime of its Session. + %% This often cannot just be a random name as both the requesting Client and the + %% responding Client need to be authorized to use it. + %% If we are to support it in the feature, the implementation should be flexible + %% to allow prefixing the response topic based on different ACL config. + %% e.g. prefix by username or client-id, so that unauthorized clients can not + %% subscribe requests or responses that are not intended for them. Props = #{'Retain-Available' => flag(Retain), 'Maximum-Packet-Size' => MaxPktSize, 'Topic-Alias-Maximum' => MaxAlias, 'Wildcard-Subscription-Available' => flag(Wildcard), 'Subscription-Identifier-Available' => 1, - 'Response-Information' => ResponseInformation, + %'Response-Information' => 'Shared-Subscription-Available' => flag(Shared)}, Props1 = if diff --git a/src/emqx_psk.erl b/src/emqx_psk.erl new file mode 100644 index 000000000..8062274ce --- /dev/null +++ b/src/emqx_psk.erl @@ -0,0 +1,36 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_psk). + +-include("logger.hrl"). + +%% SSL PSK Callbacks +-export([lookup/3]). + +-define(TAB, ?MODULE). + +-type psk_identity() :: string(). +-type psk_user_state() :: term(). + +-spec lookup(psk, psk_identity(), psk_user_state()) -> {ok, SharedSecret :: binary()} | error. +lookup(psk, ClientPSKID, UserState) -> + try emqx_hooks:run('tls_handshake.psk_lookup', [ClientPSKID], UserState) of + {ok, SharedSecret} -> {ok, SharedSecret}; + {stop, SharedSecret} -> {ok, SharedSecret} + catch + Except:Error:Stacktrace -> + ?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]), + error + end. \ No newline at end of file diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index bb615ccfe..0b18fdf23 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -14,10 +14,13 @@ -module(emqx_topic). +-include("emqx_mqtt.hrl"). + -export([match/2]). -export([validate/1, validate/2]). -export([levels/1]). -export([triples/1]). +-export([tokens/1]). -export([words/1]). -export([wildcard/1]). -export([join/1, prepend/2]). @@ -35,8 +38,6 @@ -define(MAX_TOPIC_LEN, 4096). --include("emqx_mqtt.hrl"). - %% @doc Is wildcard topic? -spec(wildcard(topic() | words()) -> true | false). wildcard(Topic) when is_binary(Topic) -> @@ -147,13 +148,19 @@ bin('#') -> <<"#">>; bin(B) when is_binary(B) -> B; bin(L) when is_list(L) -> list_to_binary(L). +-spec(levels(topic()) -> pos_integer()). levels(Topic) when is_binary(Topic) -> - length(words(Topic)). + length(tokens(Topic)). + +%% @doc Split topic to tokens. +-spec(tokens(topic()) -> list(binary())). +tokens(Topic) -> + binary:split(Topic, <<"/">>, [global]). %% @doc Split Topic Path to Words -spec(words(topic()) -> words()). words(Topic) when is_binary(Topic) -> - [word(W) || W <- binary:split(Topic, <<"/">>, [global])]. + [word(W) || W <- tokens(Topic)]. word(<<>>) -> ''; word(<<"+">>) -> '+'; diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index e58385ebb..adec0b117 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -30,7 +30,7 @@ init_per_suite(Config) -> [start_apps(App, {SchemaFile, ConfigFile}) || {App, SchemaFile, ConfigFile} <- [{emqx, local_path("priv/emqx.schema"), - local_path("etc/emqx.conf")}]], + local_path("etc/gen.emqx.conf")}]], Config. end_per_suite(_Config) -> @@ -85,12 +85,12 @@ t_alarm_handler(_) -> Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>), Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>), SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}, - emqx_client_sock:send(Sock, + emqx_client_sock:send(Sock, raw_send_serialize( ?SUBSCRIBE_PACKET( - 1, + 1, [{Topic1, SubOpts}, - {Topic2, SubOpts}]), + {Topic2, SubOpts}]), #{version => ?MQTT_PROTO_V5})), {ok, Data2} = gen_tcp:recv(Sock, 0), @@ -119,16 +119,16 @@ t_alarm_handler(_) -> t_logger_handler(_) -> %% Meck supervisor report - logger:log(error, #{label => {supervisor, start_error}, - report => [{supervisor, {local, tmp_sup}}, - {errorContext, shutdown}, - {reason, reached_max_restart_intensity}, + logger:log(error, #{label => {supervisor, start_error}, + report => [{supervisor, {local, tmp_sup}}, + {errorContext, shutdown}, + {reason, reached_max_restart_intensity}, {offender, [{pid, meck}, {id, meck}, {mfargs, {meck, start_link, []}}, {restart_type, permanent}, {shutdown, 5000}, - {child_type, worker}]}]}, + {child_type, worker}]}]}, #{logger_formatter => #{title => "SUPERVISOR REPORT"}, report_cb => fun logger:format_otp_report/1}), ?assertEqual(true, lists:keymember(supervisor_report, 1, emqx_alarm_handler:get_alarms())). @@ -142,4 +142,3 @@ raw_send_serialize(Packet, Opts) -> raw_recv_parse(P, ProtoVersion) -> emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, version => ProtoVersion}}). - diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 7b86f851c..5ac11f1af 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -32,8 +32,7 @@ <<"+/+">>, <<"TopicA/#">>]). all() -> - [{group, mqttv4}, - {group, mqttv5}]. + [{group, mqttv4}]. groups() -> [{mqttv4, [non_parallel_tests], @@ -44,10 +43,7 @@ groups() -> %% keepalive_test, redelivery_on_reconnect_test, %% subscribe_failure_test, - dollar_topics_test]}, - {mqttv5, [non_parallel_tests], - [request_response, - share_sub_request_topic]}]. + dollar_topics_test]}]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -56,89 +52,6 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). -request_response_exception(QoS) -> - {ok, Client} = emqx_client:start_link([{proto_ver, v5}, - {properties, #{ 'Request-Response-Information' => 0 }}]), - {ok, _} = emqx_client:connect(Client), - ?assertError(no_response_information, - emqx_client:sub_request_topic(Client, QoS, <<"request_topic">>)), - ok = emqx_client:disconnect(Client). - -request_response_per_qos(QoS) -> - {ok, Requester} = emqx_client:start_link([{proto_ver, v5}, - {client_id, <<"requester">>}, - {properties, #{ 'Request-Response-Information' => 1}}]), - {ok, _} = emqx_client:connect(Requester), - {ok, Responser} = emqx_client:start_link([{proto_ver, v5}, - {client_id, <<"responser">>}, - {properties, #{ - 'Request-Response-Information' => 1}}, - {request_handler, - fun(_Req) -> <<"ResponseTest">> end} - ]), - {ok, _} = emqx_client:connect(Responser), - ok = emqx_client:sub_request_topic(Responser, QoS, <<"request_topic">>), - {ok, <<"ResponseTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS), - ok = emqx_client:set_request_handler(Responser, fun(<<"request_payload">>) -> - <<"ResponseFunctionTest">>; - (_) -> - <<"404">> - end), - {ok, <<"ResponseFunctionTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS), - {ok, <<"404">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"invalid_request">>, QoS), - ok = emqx_client:disconnect(Responser), - ok = emqx_client:disconnect(Requester). - -request_response(_Config) -> - request_response_per_qos(?QOS_2), - request_response_per_qos(?QOS_1), - request_response_per_qos(?QOS_0), - request_response_exception(?QOS_0), - request_response_exception(?QOS_1), - request_response_exception(?QOS_2). - -share_sub_request_topic(_Config) -> - share_sub_request_topic_per_qos(?QOS_2), - share_sub_request_topic_per_qos(?QOS_1), - share_sub_request_topic_per_qos(?QOS_0). - -share_sub_request_topic_per_qos(QoS) -> - application:set_env(?APPLICATION, shared_subscription_strategy, random), - ReqTopic = <<"request-topic">>, - RspTopic = <<"response-topic">>, - Group = <<"g1">>, - Properties = #{ 'Request-Response-Information' => 1}, - Opts = fun(ClientId) -> [{proto_ver, v5}, - {client_id, atom_to_binary(ClientId, utf8)}, - {properties, Properties} - ] end, - {ok, Requester} = emqx_client:start_link(Opts(requester)), - {ok, _} = emqx_client:connect(Requester), - - {ok, Responser1} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]), - {ok, _} = emqx_client:connect(Responser1), - - {ok, Responser2} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]), - {ok, _} = emqx_client:connect(Responser2), - - ok = emqx_client:sub_request_topic(Responser1, QoS, ReqTopic, Group), - ok = emqx_client:sub_request_topic(Responser2, QoS, ReqTopic, Group), - %% Send a request, wait for response, validate response then return responser ID - ReqFun = fun(Req) -> - {ok, Rsp} = emqx_client:request(Requester, RspTopic, ReqTopic, Req, QoS), - case Rsp of - <<"1-", Req/binary>> -> 1; - <<"2-", Req/binary>> -> 2 - end - end, - Ids = lists:map(fun(I) -> ReqFun(integer_to_binary(I)) end, lists:seq(1, 100)), - %% we are testing with random shared-dispatch strategy, - %% fail if not all responsers got a chance to handle requests - ?assertEqual([1, 2], lists:usort(Ids)), - ok = emqx_client:disconnect(Responser1), - ok = emqx_client:disconnect(Responser2), - ok = emqx_client:disconnect(Requester). - receive_messages(Count) -> receive_messages(Count, []). diff --git a/test/emqx_hooks_SUITE.erl b/test/emqx_hooks_SUITE.erl index 864a3ab96..5fb7ed461 100644 --- a/test/emqx_hooks_SUITE.erl +++ b/test/emqx_hooks_SUITE.erl @@ -34,17 +34,23 @@ add_delete_hook(_) -> ?assertEqual(Callbacks, emqx_hooks:lookup(test_hook)), ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1), ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun2/1), - timer:sleep(1000), + timer:sleep(200), ?assertEqual([], emqx_hooks:lookup(test_hook)), - ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 8), - ok = emqx:hook(emqx_hook, {?MODULE, hook_fun1, []}, 9), - Callbacks2 = [{callback, {?MODULE, hook_fun1, []}, undefined, 9}, - {callback, {?MODULE, hook_fun2, []}, undefined, 8}], + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun8, []}, 8), + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 2), + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun10, []}, 10), + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun9, []}, 9), + Callbacks2 = [{callback, {?MODULE, hook_fun10, []}, undefined, 10}, + {callback, {?MODULE, hook_fun9, []}, undefined, 9}, + {callback, {?MODULE, hook_fun8, []}, undefined, 8}, + {callback, {?MODULE, hook_fun2, []}, undefined, 2}], ?assertEqual(Callbacks2, emqx_hooks:lookup(emqx_hook)), - ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun1, []}), - ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2}), - timer:sleep(1000), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun8, []}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun9, []}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun10, []}), + timer:sleep(200), ?assertEqual([], emqx_hooks:lookup(emqx_hook)), ok = emqx_hooks:stop(). @@ -67,16 +73,27 @@ run_hooks(_) -> ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}), {stop, []} = emqx:run_hooks(foldl_hook2, [arg], []), - ok = emqx:hook(filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0), - ok = emqx:run_hooks(filter1_hook, [arg]), + %% foreach hook always returns 'ok' or 'stop' + ok = emqx:hook(foreach_filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0), + ?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg])), %% filter passed + ?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg1])), %% filter failed - ok = emqx:hook(filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, []}), - {ok, []} = emqx:run_hooks(filter2_hook, [arg], []), + %% foldl hook always returns {'ok', Acc} or {'stop', Acc} + ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, [init_arg]}), + ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, {?MODULE, hook_filter2_1, [init_arg]}), + ?assertEqual({ok, 3}, emqx:run_hooks(foldl_filter2_hook, [arg], 1)), + ?assertEqual({ok, 2}, emqx:run_hooks(foldl_filter2_hook, [arg1], 1)), ok = emqx_hooks:stop(). -hook_fun1([]) -> ok. -hook_fun2([]) -> {ok, []}. +hook_fun1(arg) -> ok; +hook_fun1(_) -> stop. + +hook_fun2(arg) -> ok; +hook_fun2(_) -> stop. + +hook_fun2(_, Acc) -> {ok, Acc + 1}. +hook_fun2_1(_, Acc) -> {ok, Acc + 1}. hook_fun3(arg1, arg2, _Acc, init) -> ok. hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. @@ -89,6 +106,12 @@ hook_fun8(arg, initArg) -> stop. hook_fun9(arg, _Acc) -> any. hook_fun10(arg, _Acc) -> stop. -hook_filter1(arg) -> true. -hook_filter2(arg, _Acc) -> true. +hook_filter1(arg) -> true; +hook_filter1(_) -> false. +hook_filter2(arg, _Acc, init_arg) -> true; +hook_filter2(_, _Acc, _IntArg) -> false. + +hook_filter2_1(arg, _Acc, init_arg) -> true; +hook_filter2_1(arg1, _Acc, init_arg) -> true; +hook_filter2_1(_, _Acc, _IntArg) -> false. diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 74271d685..0297d2615 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -154,9 +154,10 @@ connect_v5(_) -> #{'Request-Response-Information' => 1}}) )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, - #{'Response-Information' := _RespInfo}), _} = - raw_recv_parse(Data, ?MQTT_PROTO_V5) + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), _} = + raw_recv_parse(Data, ?MQTT_PROTO_V5), + ?assertNot(maps:is_key('Response-Information', Props)), + ok end), % topic alias = 0 diff --git a/test/emqx_request_handler.erl b/test/emqx_request_handler.erl new file mode 100644 index 000000000..d80288023 --- /dev/null +++ b/test/emqx_request_handler.erl @@ -0,0 +1,102 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc This module implements a request handler based on emqx_client. +%% A request handler is a MQTT client which subscribes to a request topic, +%% processes the requests then send response to another topic which is +%% subscribed by the request sender. +%% This code is in test directory because request and response are pure +%% client-side behaviours. + +-module(emqx_request_handler). + +-export([start_link/4, stop/1]). + +-include("emqx_client.hrl"). + +-type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos(). +-type topic() :: emqx_topic:topic(). +-type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()). + +-spec start_link(topic(), qos(), handler(), emqx_client:options()) -> + {ok, pid()} | {error, any()}. +start_link(RequestTopic, QoS, RequestHandler, Options0) -> + Parent = self(), + MsgHandler = make_msg_handler(RequestHandler, Parent), + Options = [{msg_handler, MsgHandler} | Options0], + case emqx_client:start_link(Options) of + {ok, Pid} -> + {ok, _} = emqx_client:connect(Pid), + try subscribe(Pid, RequestTopic, QoS) of + ok -> {ok, Pid}; + {error, _} = Error -> Error + catch + C : E : S -> + emqx_client:stop(Pid), + {error, {C, E, S}} + end; + {error, _} = Error -> Error + end. + +stop(Pid) -> + emqx_client:disconnect(Pid). + +make_msg_handler(RequestHandler, Parent) -> + #{publish => fun(Msg) -> handle_msg(Msg, RequestHandler, Parent) end, + puback => fun(_Ack) -> ok end, + disconnected => fun(_Reason) -> ok end + }. + +handle_msg(ReqMsg, RequestHandler, Parent) -> + #{qos := QoS, properties := Props, payload := ReqPayload} = ReqMsg, + case maps:find('Response-Topic', Props) of + {ok, RspTopic} when RspTopic =/= <<>> -> + CorrData = maps:get('Correlation-Data', Props), + RspProps = maps:without(['Response-Topic'], Props), + RspPayload = RequestHandler(CorrData, ReqPayload), + RspMsg = #mqtt_msg{qos = QoS, + topic = RspTopic, + props = RspProps, + payload = RspPayload + }, + emqx_logger:debug("~p sending response msg to topic ~s with~n" + "corr-data=~p~npayload=~p", + [?MODULE, RspTopic, CorrData, RspPayload]), + ok = send_response(RspMsg); + _ -> + Parent ! {discarded, ReqPayload}, + ok + end. + +send_response(Msg) -> + %% This function is evaluated by emqx_client itself. + %% hence delegate to another temp process for the loopback gen_statem call. + Client = self(), + _ = spawn_link(fun() -> + case emqx_client:publish(Client, Msg) of + ok -> ok; + {ok, _} -> ok; + {error, Reason} -> exit({failed_to_publish_response, Reason}) + end + end), + ok. + +subscribe(Client, Topic, QoS) -> + {ok, _Props, _QoS} = + emqx_client:subscribe(Client, [{Topic, [{rh, 2}, {rap, false}, + {nl, true}, {qos, QoS}]}]), + ok. + + + diff --git a/test/emqx_request_response_SUITE.erl b/test/emqx_request_response_SUITE.erl new file mode 100644 index 000000000..6709e958e --- /dev/null +++ b/test/emqx_request_response_SUITE.erl @@ -0,0 +1,68 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_request_response_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]). + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +all() -> + [request_response]. + +request_response(_Config) -> + request_response_per_qos(?QOS_0), + request_response_per_qos(?QOS_1), + request_response_per_qos(?QOS_2). + +request_response_per_qos(QoS) -> + ReqTopic = <<"request_topic">>, + RspTopic = <<"response_topic">>, + {ok, Requester} = emqx_request_sender:start_link(RspTopic, QoS, + [{proto_ver, v5}, + {client_id, <<"requester">>}, + {properties, #{ 'Request-Response-Information' => 1}}]), + %% This is a square service + Square = fun(_CorrData, ReqBin) -> + I = b2i(ReqBin), + i2b(I * I) + end, + {ok, Responser} = emqx_request_handler:start_link(ReqTopic, QoS, Square, + [{proto_ver, v5}, + {client_id, <<"responser">>} + ]), + ok = emqx_request_sender:send(Requester, ReqTopic, RspTopic, <<"corr-1">>, <<"2">>, QoS), + receive + {response, <<"corr-1">>, <<"4">>} -> + ok; + Other -> + erlang:error({unexpected, Other}) + after + 100 -> + erlang:error(timeout) + end, + ok = emqx_request_sender:stop(Requester), + ok = emqx_request_handler:stop(Responser). + +b2i(B) -> binary_to_integer(B). +i2b(I) -> integer_to_binary(I). diff --git a/test/emqx_request_sender.erl b/test/emqx_request_sender.erl new file mode 100644 index 000000000..e01db96c0 --- /dev/null +++ b/test/emqx_request_sender.erl @@ -0,0 +1,82 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc This module implements a request sender based on emqx_client. +%% A request sender is a MQTT client which sends messages to a request +%% topic, and subscribes to another topic for responses. +%% This code is in test directory because request and response are pure +%% client-side behaviours. + +-module(emqx_request_sender). + +-export([start_link/3, stop/1, send/6]). + +-include("emqx_client.hrl"). + +start_link(ResponseTopic, QoS, Options0) -> + Parent = self(), + MsgHandler = make_msg_handler(Parent), + Options = [{msg_handler, MsgHandler} | Options0], + case emqx_client:start_link(Options) of + {ok, Pid} -> + {ok, _} = emqx_client:connect(Pid), + try subscribe(Pid, ResponseTopic, QoS) of + ok -> {ok, Pid}; + {error, _} = Error -> Error + catch + C : E : S -> + emqx_client:stop(Pid), + {error, {C, E, S}} + end; + {error, _} = Error -> Error + end. + +%% @doc Send a message to request topic with correlation-data `CorrData'. +%% Response should be delivered as a `{response, CorrData, Payload}' +send(Client, ReqTopic, RspTopic, CorrData, Payload, QoS) -> + Props = #{'Response-Topic' => RspTopic, + 'Correlation-Data' => CorrData + }, + Msg = #mqtt_msg{qos = QoS, + topic = ReqTopic, + props = Props, + payload = Payload + }, + case emqx_client:publish(Client, Msg) of + ok -> ok; %% QoS = 0 + {ok, _} -> ok; + {error, _} = E -> E + end. + +stop(Pid) -> + emqx_client:disconnect(Pid). + +subscribe(Client, Topic, QoS) -> + case emqx_client:subscribe(Client, Topic, QoS) of + {ok, _, _} -> ok; + {error, _} = Error -> Error + end. + +make_msg_handler(Parent) -> + #{publish => fun(Msg) -> handle_msg(Msg, Parent) end, + puback => fun(_Ack) -> ok end, + disconnected => fun(_Reason) -> ok end + }. + +handle_msg(Msg, Parent) -> + #{properties := Props, payload := Payload} = Msg, + CorrData = maps:get('Correlation-Data', Props), + Parent ! {response, CorrData, Payload}, + ok. + diff --git a/test/emqx_topic_SUITE.erl b/test/emqx_topic_SUITE.erl index 2ad57154d..0a51598ee 100644 --- a/test/emqx_topic_SUITE.erl +++ b/test/emqx_topic_SUITE.erl @@ -20,8 +20,17 @@ -compile(export_all). -compile(nowarn_export_all). --import(emqx_topic, [wildcard/1, match/2, validate/1, triples/1, join/1, - words/1, systop/1, feed_var/3, parse/1]). +-import(emqx_topic, + [wildcard/1, + match/2, + validate/1, + triples/1, + join/1, + words/1, + systop/1, + feed_var/3, + parse/1 + ]). -define(N, 10000). @@ -32,6 +41,7 @@ all() -> t_triples, t_join, t_levels, + t_tokens, t_words, t_systop, t_feed_var, @@ -165,6 +175,9 @@ t_triples_perf(_) -> t_levels(_) -> ?assertEqual(4, emqx_topic:levels(<<"a/b/c/d">>)). +t_tokens(_) -> + ?assertEqual([<<"a">>, <<"b">>, <<"+">>, <<"#">>], emqx_topic:tokens(<<"a/b/+/#">>)). + t_words(_) -> ['', <<"a">>, '+', '#'] = words(<<"/a/+/#">>), ['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'] = words(<<"/abkc/19383/+/akakdkkdkak/#">>), @@ -193,9 +206,12 @@ t_systop(_) -> ?assertEqual(SysTop2,systop(<<"abc">>)). t_feed_var(_) -> - ?assertEqual(<<"$queue/client/clientId">>, feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)), - ?assertEqual(<<"username/test/client/x">>, feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>)), - ?assertEqual(<<"username/test/client/clientId">>, feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>)). + ?assertEqual(<<"$queue/client/clientId">>, + feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)), + ?assertEqual(<<"username/test/client/x">>, + feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>)), + ?assertEqual(<<"username/test/client/clientId">>, + feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>)). long_topic() -> iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]). @@ -208,3 +224,4 @@ t_parse(_) -> ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)), ?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)), ?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)). +