Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-03-14 12:09:18 +08:00
commit e349136cb2
17 changed files with 512 additions and 476 deletions

View File

@ -432,12 +432,6 @@ acl_deny_action = ignore
## MQTT Protocol
##--------------------------------------------------------------------
## Response Topic Prefix
##
## Value: String
## Default: emqxrspv1
mqtt.response_topic_prefix = emqxrspv1
## Maximum MQTT packet size allowed.
##
## Value: Bytes
@ -779,16 +773,6 @@ listener.tcp.external.active_n = 100
## Value: String
listener.tcp.external.zone = external
## Mountpoint of the MQTT/TCP Listener. All the topics will be prefixed
## with the mountpoint path if this option is enabled.
##
## Variables in mountpoint path:
## - %c: clientid
## - %u: username
##
## Value: String
## listener.tcp.external.mountpoint = devicebound/
## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'.
##
## Value: rate,burst
@ -918,13 +902,6 @@ listener.tcp.internal.active_n = 1000
## Value: String
listener.tcp.internal.zone = internal
## Mountpoint of the MQTT/TCP Listener.
##
## See: listener.tcp.$name.mountpoint
##
## Value: String
## listener.tcp.internal.mountpoint = internal/
## Rate limit for the internal MQTT/TCP connections.
##
## See: listener.tcp.$name.rate_limit
@ -1030,11 +1007,6 @@ listener.ssl.external.active_n = 100
## Value: String
listener.ssl.external.zone = external
## Mountpoint of the MQTT/SSL Listener.
##
## Value: String
## listener.ssl.external.mountpoint = devicebound/
## The access control rules for the MQTT/SSL listener.
##
## See: listener.tcp.$name.access
@ -1147,6 +1119,12 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
## Value: Ciphers
listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA
## Ciphers for TLS PSK.
## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#listener.ssl.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## SSL parameter renegotiation is a feature that allows a client and a server
## to renegotiate the parameters of the SSL connection on the fly.
## RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation,
@ -1281,13 +1259,6 @@ listener.ws.external.max_conn_rate = 1000
## Value: String
listener.ws.external.zone = external
## Mountpoint of the MQTT/WebSocket Listener.
##
## See: listener.tcp.$name.mountpoint
##
## Value: String
## listener.ws.external.mountpoint = devicebound/
## The access control for the MQTT/WebSocket listener.
##
## See: listener.tcp.$name.access
@ -1427,13 +1398,6 @@ listener.wss.external.max_conn_rate = 1000
## Value: String
listener.wss.external.zone = external
## Mountpoint of the MQTT/WebSocket/SSL Listener.
##
## See: listener.tcp.$name.mountpoint
##
## Value: String
## listener.wss.external.mountpoint = devicebound/
## The access control rules for the MQTT/WebSocket/SSL listener.
##
## See: listener.tcp.$name.access.<no>
@ -1516,7 +1480,13 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
## See: listener.ssl.$name.ciphers
##
## Value: Ciphers
## listener.wss.external.ciphers =
listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA
## Ciphers for TLS PSK.
## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## See: listener.ssl.$name.secure_renegotiate
##
@ -1587,8 +1557,6 @@ listener.wss.external.send_timeout_close = on
## Value: true | false
## listener.wss.external.nodelay = true
listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA
##--------------------------------------------------------------------
## Bridges
##--------------------------------------------------------------------
@ -1669,7 +1637,13 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## SSL Ciphers used by the bridge.
##
## Value: String
## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
#bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## Ciphers for TLS PSK.
## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## Ping interval of a down bridge.
##
@ -1829,6 +1803,12 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## Value: String
## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## Ciphers for TLS PSK.
## Note that 'bridge.*.ciphers' and 'bridge.*.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#bridge.azure.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## Ping interval of a down bridge.
##
## Value: Duration

View File

@ -15,6 +15,7 @@
-ifndef(EMQX_CLIENT_HRL).
-define(EMQX_CLIENT_HRL, true).
-include("emqx_mqtt.hrl").
-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false,
packet_id, topic, props, payload}).
-endif.

View File

@ -563,11 +563,6 @@ end}.
%% MQTT Protocol
%%--------------------------------------------------------------------
%% @doc Response Topic Prefix
{mapping, "mqtt.response_topic_prefix", "emqx.response_topic_prefix",[
{datatype, string}
]}.
%% @doc Max Packet Size Allowed, 1MB by default.
{mapping, "mqtt.max_packet_size", "emqx.max_packet_size", [
{default, "1MB"},
@ -866,15 +861,16 @@ end}.
{force_shutdown_policy, ShutdownPolicy};
("mqueue_priorities", Val) ->
case Val of
"none" -> none; % NO_PRIORITY_TABLE
"none" -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE
_ ->
lists:foldl(fun(T, Acc) ->
MqueuePriorities = lists:foldl(fun(T, Acc) ->
%% NOTE: space in "= " is intended
[{Topic, Prio}] = string:tokens(T, "= "),
[Topic, Prio] = string:tokens(T, "= "),
P = list_to_integer(Prio),
(P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}),
maps:put(iolist_to_binary(Topic), P, Acc)
end, string:tokens(Val, ","))
end, #{}, string:tokens(Val, ",")),
{mqueue_priorities, MqueuePriorities}
end;
("mountpoint", Val) ->
{mountpoint, iolist_to_binary(Val)};
@ -924,10 +920,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.tcp.$name.mountpoint", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.tcp.$name.rate_limit", "emqx.listeners", [
{default, undefined},
{datatype, string}
@ -1024,10 +1016,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.ssl.$name.mountpoint", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.ssl.$name.rate_limit", "emqx.listeners", [
{default, undefined},
{datatype, string}
@ -1098,6 +1086,10 @@ end}.
{datatype, string}
]}.
{mapping, "listener.ssl.$name.psk_ciphers", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.ssl.$name.handshake_timeout", "emqx.listeners", [
{default, "15s"},
{datatype, {duration, ms}}
@ -1174,10 +1166,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.ws.$name.mountpoint", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.ws.$name.rate_limit", "emqx.listeners", [
{default, undefined},
{datatype, string}
@ -1280,10 +1268,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.wss.$name.mountpoint", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.wss.$name.rate_limit", "emqx.listeners", [
{datatype, string}
]}.
@ -1368,6 +1352,10 @@ end}.
{datatype, string}
]}.
{mapping, "listener.wss.$name.psk_ciphers", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.wss.$name.keyfile", "emqx.listeners", [
{datatype, string}
]}.
@ -1423,8 +1411,6 @@ end}.
end
end,
MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end,
Ratelimit = fun(undefined) ->
undefined;
(S) ->
@ -1442,7 +1428,6 @@ end}.
{rate_limit, Ratelimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))},
{proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)},
{proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
{mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))},
{verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
{proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
@ -1460,14 +1445,40 @@ end}.
end,
SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
MapPSKCiphers = fun(PSKCiphers) ->
lists:map(
fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
("PSK-RC4-SHA") -> {psk, rc4_128, sha}
end, PSKCiphers)
end,
SslOpts = fun(Prefix) ->
Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of
Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of
undefined -> undefined;
L -> [list_to_atom(V) || V <- L]
end,
TLSCiphers = cuttlefish:conf_get(Prefix++".ciphers", Conf, undefined),
PSKCiphers = cuttlefish:conf_get(Prefix++".psk_ciphers", Conf, undefined),
Ciphers =
case {TLSCiphers, PSKCiphers} of
{undefined, undefined} ->
cuttlefish:invalid(Prefix++".ciphers or "++Prefix++".psk_ciphers is absent");
{TLSCiphers, undefined} ->
SplitFun(TLSCiphers);
{undefined, PSKCiphers} ->
MapPSKCiphers(SplitFun(PSKCiphers));
{_TLSCiphers, _PSKCiphers} ->
cuttlefish:invalid(Prefix++".ciphers and "++Prefix++".psk_ciphers cannot be configured at the same time")
end,
UserLookupFun =
case PSKCiphers of
undefined -> undefined;
_ -> {fun emqx_psk:lookup/3, <<>>}
end,
Filter([{versions, Versions},
{ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))},
{ciphers, Ciphers},
{user_lookup_fun, UserLookupFun},
{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)},
{dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)},
{keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
@ -1567,6 +1578,10 @@ end}.
{datatype, string}
]}.
{mapping, "bridge.$name.psk_ciphers", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.keepalive", "emqx.bridges", [
{default, "10s"},
{datatype, {duration, ms}}
@ -1622,22 +1637,34 @@ end}.
]}.
{translation, "emqx.bridges", fun(Conf) ->
MapPSKCiphers = fun(PSKCiphers) ->
lists:map(
fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
("PSK-RC4-SHA") -> {psk, rc4_128, sha}
end, PSKCiphers)
end,
Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
IsSsl = fun(cacertfile) -> true;
(certfile) -> true;
(keyfile) -> true;
(ciphers) -> true;
(psk_ciphers) -> true;
(tls_versions) -> true;
(_Opt) -> false
end,
Parse = fun(tls_versions, Vers) ->
{versions, [list_to_atom(S) || S <- Split(Vers)]};
[{versions, [list_to_atom(S) || S <- Split(Vers)]}];
(ciphers, Ciphers) ->
{ciphers, Split(Ciphers)};
[{ciphers, Split(Ciphers)}];
(psk_ciphers, Ciphers) ->
[{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}];
(Opt, Val) ->
{Opt, Val}
[{Opt, Val}]
end,
Merge = fun(forwards, Val, Opts) ->
@ -1645,7 +1672,7 @@ end}.
(Opt, Val, Opts) ->
case IsSsl(Opt) of
true ->
SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])],
SslOpts = Parse(Opt, Val) ++ [proplists:get_value(ssl_opts, Opts, [])],
lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
false ->
[{Opt, Val}|Opts]

View File

@ -1,6 +1,7 @@
{deps, [{jsx, "2.9.0"},
{gproc, "0.8.0"},
{cowboy, "2.4.0"}
{cowboy, "2.4.0"},
{meck, "0.8.13"} %% temp workaround for version check
]}.
%% appended to deps in rebar.config.script
@ -28,4 +29,3 @@
{plugins, [coveralls]}.
{profiles, [{test, [{deps, [{meck, "0.8.13"}]}]}]}.

View File

@ -17,12 +17,9 @@
-behaviour(gen_statem).
-include("types.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_client.hrl").
-export([start_link/0, start_link/1]).
-export([request/5, request/6, request_async/7, receive_response/3]).
-export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]).
-export([connect/1]).
-export([subscribe/2, subscribe/3, subscribe/4]).
-export([publish/2, publish/3, publish/4, publish/5]).
@ -41,9 +38,7 @@
-export([initialized/3, waiting_for_connack/3, connected/3, inflight_full/3]).
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0,
request_input/0, response_payload/0, request_handler/0,
corr_data/0, mqtt_msg/0]).
-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, mqtt_msg/0]).
-export_type([host/0, option/0]).
@ -57,24 +52,12 @@
-define(WILL_MSG(QoS, Retain, Topic, Props, Payload),
#mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}).
-define(RESPONSE_TIMEOUT_SECONDS, timer:seconds(5)).
-define(NO_REQ_HANDLER, undefined).
-define(NO_GROUP, <<>>).
-define(NO_CLIENT_ID, <<>>).
-type(host() :: inet:ip_address() | inet:hostname()).
-type(corr_data() :: binary()).
%% NOTE: Message handler is different from request handler.
%% Message handler is a set of callbacks defined to handle MQTT messages as well as
%% the disconnect event.
%% Request handler is a callback to handle received MQTT message as in 'request',
%% and publish another MQTT message back to the defined topic as in 'response'.
%% `owner' and `msg_handler' has no effect when `request_handler' is set.
%% Message handler is a set of callbacks defined to handle MQTT messages
%% as well as the disconnect event.
-define(NO_MSG_HDLR, undefined).
-type(msg_handler() :: #{puback := fun((_) -> any()),
publish := fun((emqx_types:message()) -> any()),
@ -100,7 +83,6 @@
| {keepalive, non_neg_integer()}
| {max_inflight, pos_integer()}
| {retry_interval, timeout()}
| {request_handler, request_handler()}
| {will_topic, iodata()}
| {will_payload, iodata()}
| {will_retain, boolean()}
@ -146,7 +128,6 @@
ack_timer :: reference(),
retry_interval :: pos_integer(),
retry_timer :: reference(),
request_handler :: request_handler(),
session_present :: boolean(),
last_packet_id :: packet_id(),
parse_state :: emqx_frame:state()}).
@ -176,35 +157,10 @@
-type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}).
-type(request_input() :: binary()).
-type(response_payload() :: binary()).
-type(request_handler() :: fun((request_input()) -> response_payload())).
-type(group() :: binary()).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
%% @doc Swap in a new request handler on the fly.
-spec(set_request_handler(client(), request_handler()) -> ok).
set_request_handler(Responser, RequestHandler) ->
gen_statem:call(Responser, {set_request_handler, RequestHandler}).
%% @doc Subscribe to request topic.
-spec(sub_request_topic(client(), qos(), topic()) -> ok).
sub_request_topic(Client, QoS, Topic) ->
sub_request_topic(Client, QoS, Topic, ?NO_GROUP).
%% @doc Share-subscribe to request topic.
-spec(sub_request_topic(client(), qos(), topic(), group()) -> ok).
sub_request_topic(Client, QoS, Topic, Group) ->
Properties = get_properties(Client),
NewTopic = make_req_rsp_topic(Properties, Topic, Group),
subscribe_req_rsp_topic(Client, QoS, NewTopic).
-spec(start_link() -> gen_statem:start_ret()).
start_link() -> start_link([]).
@ -293,76 +249,6 @@ parse_subopt([{nl, false} | Opts], Result) ->
parse_subopt([{qos, QoS} | Opts], Result) ->
parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}).
-spec(request(client(), topic(), topic(), payload(), qos() | [pubopt()])
-> ok | {ok, packet_id()} | {error, term()}).
request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), is_atom(QoS) ->
request(Client, ResponseTopic, RequestTopic, Payload, [{qos, ?QOS_I(QoS)}]);
request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), ?IS_QOS(QoS) ->
request(Client, ResponseTopic, RequestTopic, Payload, [{qos, QoS}]);
request(Client, ResponseTopic, RequestTopic, Payload, Opts) when is_binary(ResponseTopic), is_list(Opts) ->
request(Client, ResponseTopic, RequestTopic, Payload, Opts, _Properties = #{}).
%% @doc Send a request to request topic and wait for response.
-spec(request(client(), topic(), topic(), payload(), [pubopt()], properties())
-> {ok, response_payload()} | {error, term()}).
request(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties) ->
CorrData = make_corr_data(),
case request_async(Client, ResponseTopic, RequestTopic,
Payload, Opts, Properties, CorrData) of
ok -> receive_response(Client, CorrData, Opts);
{error, Reason} -> {error, Reason}
end.
%% @doc Get client properties.
-spec(get_properties(client()) -> properties()).
get_properties(Client) -> gen_statem:call(Client, get_properties, infinity).
%% @doc Send a request, but do not wait for response.
%% The caller should expect a `{publish, Response}' message,
%% or call `receive_response/3' to receive the message.
-spec(request_async(client(), topic(), topic(), payload(),
[pubopt()], properties(), corr_data()) -> ok | {error, any()}).
request_async(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties, CorrData)
when is_binary(ResponseTopic),
is_binary(RequestTopic),
is_map(Properties),
is_list(Opts) ->
ok = emqx_mqtt_props:validate(Properties),
Retain = proplists:get_bool(retain, Opts),
QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)),
ClientProperties = get_properties(Client),
NewResponseTopic = make_req_rsp_topic(ClientProperties, ResponseTopic),
NewRequestTopic = make_req_rsp_topic(ClientProperties, RequestTopic),
%% This is perhaps not optimal to subscribe the response topic for
%% each and every request even though the response topic is always the same
ok = sub_response_topic(Client, QoS, NewResponseTopic),
NewProperties = maps:merge(Properties, #{'Response-Topic' => NewResponseTopic,
'Correlation-Data' => CorrData}),
case publish(Client, #mqtt_msg{qos = QoS,
retain = Retain,
topic = NewRequestTopic,
props = NewProperties,
payload = iolist_to_binary(Payload)}) of
ok -> ok;
{ok, _PacketId} -> ok; %% assume auto_ack
{error, Reason} -> {error, Reason}
end.
%% @doc Block wait the response for a request sent earlier.
-spec(receive_response(client(), corr_data(), [pubopt()])
-> {ok, response_payload()} | {error, any()}).
receive_response(Client, CorrData, Opts) ->
TimeOut = proplists:get_value(timeout, Opts, ?RESPONSE_TIMEOUT_SECONDS),
MRef = erlang:monitor(process, Client),
TRef = erlang:start_timer(TimeOut, self(), response),
try
receive_response(Client, CorrData, TRef, MRef)
after
erlang:cancel_timer(TRef),
receive {timeout, TRef, _} -> ok after 0 -> ok end,
erlang:demonitor(MRef, [flush])
end.
-spec(publish(client(), topic(), payload()) -> ok | {error, term()}).
publish(Client, Topic, Payload) when is_binary(Topic) ->
publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}).
@ -511,7 +397,6 @@ init([Options]) ->
auto_ack = true,
ack_timeout = ?DEFAULT_ACK_TIMEOUT,
retry_interval = 0,
request_handler = ?NO_REQ_HANDLER,
connect_timeout = ?DEFAULT_CONNECT_TIMEOUT,
last_packet_id = 1}),
{ok, initialized, init_parse_state(State)}.
@ -616,8 +501,6 @@ init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) ->
init(Opts, State#state{auto_ack = AutoAck});
init([{retry_interval, I} | Opts], State) ->
init(Opts, State#state{retry_interval = timer:seconds(I)});
init([{request_handler, Handler} | Opts], State) ->
init(Opts, State#state{request_handler = Handler});
init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) ->
init(Opts, State#state{bridge_mode = Mode});
init([_Opt | Opts], State) ->
@ -756,15 +639,9 @@ connected({call, From}, pause, State) ->
connected({call, From}, resume, State) ->
{keep_state, State#state{paused = false}, [{reply, From, ok}]};
connected({call, From}, get_properties, #state{properties = Properties}) ->
{keep_state_and_data, [{reply, From, Properties}]};
connected({call, From}, client_id, #state{client_id = ClientId}) ->
{keep_state_and_data, [{reply, From, ClientId}]};
connected({call, From}, {set_request_handler, RequestHandler}, State) ->
{keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]};
connected({call, From}, SubReq = {subscribe, Properties, Topics},
State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) ->
case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of
@ -846,25 +723,12 @@ connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) ->
connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) ->
keep_state_and_data;
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload),
State) when Properties =/= undefined ->
NewState = response_publish(Properties, State, ?QOS_0, Payload),
{keep_state, deliver(packet_to_msg(Packet), NewState)};
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) ->
{keep_state, deliver(packet_to_msg(Packet), State)};
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, Properties, Payload), State)
when Properties =/= undefined ->
NewState = response_publish(Properties, State, ?QOS_1, Payload),
publish_process(?QOS_1, Packet, NewState);
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
publish_process(?QOS_1, Packet, State);
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, Properties, Payload), State)
when Properties =/= undefined ->
NewState = response_publish(Properties, State, ?QOS_2, Payload),
publish_process(?QOS_2, Packet, NewState);
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
publish_process(?QOS_2, Packet, State);
@ -1086,57 +950,6 @@ delete_inflight_when_full(Packet, State0) ->
false -> {next_state, connected, State}
end.
%% Subscribe to response topic.
-spec(sub_response_topic(client(), qos(), topic()) -> ok).
sub_response_topic(Client, QoS, Topic) when is_binary(Topic) ->
subscribe_req_rsp_topic(Client, QoS, Topic).
receive_response(Client, CorrData, TRef, MRef) ->
receive
{publish, Response} ->
{ok, Properties} = maps:find(properties, Response),
case maps:find('Correlation-Data', Properties) of
{ok, CorrData} ->
maps:find(payload, Response);
_ ->
emqx_logger:debug("Discarded stale response: ~p", [Response]),
receive_response(Client, CorrData, TRef, MRef)
end;
{timeout, TRef, response} ->
{error, timeout};
{'DOWN', MRef, process, _, _} ->
{error, client_down}
end.
%% Make a unique correlation data for each request.
%% It has to be unique because stale responses should be discarded.
make_corr_data() -> term_to_binary(make_ref()).
%% Shared function for request and response topic subscription.
subscribe_req_rsp_topic(Client, QoS, Topic) ->
%% It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription
{ok, _Props, _QoS} = subscribe(Client, [{Topic, [{rh, 2}, {rap, false},
{nl, not ?IS_SHARE(Topic)},
{qos, QoS}]}]),
emqx_logger:debug("Subscribed to topic ~s", [Topic]),
ok.
%% Make a request or response topic.
make_req_rsp_topic(Properties, Topic) ->
make_req_rsp_topic(Properties, Topic, ?NO_GROUP).
%% Same as make_req_rsp_topic/2, but allow shared subscription (for request topics)
make_req_rsp_topic(Properties, Topic, Group) ->
case maps:find('Response-Information', Properties) of
{ok, ResponseInformation} when ResponseInformation =/= <<>> ->
emqx_topic:join([req_rsp_topic_prefix(Group, ResponseInformation), Topic]);
_ ->
erlang:error(no_response_information)
end.
req_rsp_topic_prefix(?NO_GROUP, Prefix) -> Prefix;
req_rsp_topic_prefix(Group, Prefix) -> ?SHARE(Group, Prefix).
assign_id(?NO_CLIENT_ID, Props) ->
case maps:find('Assigned-Client-Identifier', Props) of
{ok, Value} ->
@ -1162,49 +975,6 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
Stop -> Stop
end.
response_publish(#{'Response-Topic' := ResponseTopic} = Properties,
State = #state{request_handler = RequestHandler}, QoS, Payload)
when RequestHandler =/= ?NO_REQ_HANDLER ->
do_publish(ResponseTopic, Properties, State, QoS, Payload);
response_publish(_Properties, State, _QoS, _Payload) -> State.
do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler}, ?QOS_0, Payload) ->
Msg = #mqtt_msg{qos = ?QOS_0,
retain = false,
topic = ResponseTopic,
props = Properties,
payload = RequestHandler(Payload)
},
case send(Msg, State) of
{ok, NewState} -> NewState;
_Error -> State
end;
do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler,
inflight = Inflight,
last_packet_id = PacketId},
QoS, Payload)
when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2)->
case emqx_inflight:is_full(Inflight) of
true ->
emqx_logger:error("Inflight is full"),
State;
false ->
Msg = #mqtt_msg{packet_id = PacketId,
qos = QoS,
retain = false,
topic = ResponseTopic,
props = Properties,
payload = RequestHandler(Payload)},
case send(Msg, State) of
{ok, NewState} ->
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight),
ensure_retry_timer(NewState#state{inflight = Inflight1});
{error, Reason} ->
emqx_logger:error("Send failed: ~p", [Reason]),
State
end
end.
ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) ->
ensure_keepalive_timer(timer:seconds(Secs), State);
ensure_keepalive_timer(State = #state{keepalive = 0}) ->
@ -1291,9 +1061,6 @@ retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) ->
Error
end.
deliver(_Msg, State = #state{request_handler = Hdlr}) when Hdlr =/= ?NO_REQ_HANDLER ->
%% message has been terminated by request handler, hence should not continue processing
State;
deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
topic = Topic, props = Props, payload = Payload},
State) ->
@ -1303,17 +1070,17 @@ deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
ok = eval_msg_handler(State, publish, Msg),
State.
eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER,
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
owner = Owner},
disconnected, {ReasonCode, Properties}) ->
%% Special handling for disconnected message when there is no handler callback
Owner ! {disconnected, ReasonCode, Properties},
ok;
eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER},
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR},
disconnected, _OtherReason) ->
%% do nothing to be backward compatible
ok;
eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER,
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
owner = Owner}, Kind, Msg) ->
Owner ! {Kind, Msg},
ok;

View File

@ -28,6 +28,14 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
%% Multiple callbacks can be registered on a hookpoint.
%% The execution order depends on the priority value:
%% - Callbacks with greater priority values will be run before
%% the ones with lower priority values. e.g. A Callback with
%% priority = 2 precedes the callback with priority = 1.
%% - The execution order is the adding order of callbacks if they have
%% equal priority values.
-type(hookpoint() :: atom()).
-type(action() :: function() | mfa()).
-type(filter() :: function() | mfa()).
@ -56,14 +64,14 @@ stop() ->
%%------------------------------------------------------------------------------
%% @doc Register a callback
-spec(add(hookpoint(), action() | #callback{}) -> emqx_types:ok_or_error(already_exists)).
-spec(add(hookpoint(), action() | #callback{}) -> ok_or_error(already_exists)).
add(HookPoint, Callback) when is_record(Callback, callback) ->
gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity);
add(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
add(HookPoint, #callback{action = Action, priority = 0}).
-spec(add(hookpoint(), action(), filter() | integer() | list())
-> emqx_types:ok_or_error(already_exists)).
-> ok_or_error(already_exists)).
add(HookPoint, Action, InitArgs) when is_function(Action), is_list(InitArgs) ->
add(HookPoint, #callback{action = {Action, InitArgs}, priority = 0});
add(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) ->
@ -72,8 +80,8 @@ add(HookPoint, Action, Priority) when is_integer(Priority) ->
add(HookPoint, #callback{action = Action, priority = Priority}).
-spec(add(hookpoint(), action(), filter(), integer())
-> emqx_types:ok_or_error(already_exists)).
add(HookPoint, Action, Filter, Priority) ->
-> ok_or_error(already_exists)).
add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
%% @doc Unregister a callback.
@ -87,17 +95,17 @@ run(HookPoint, Args) ->
run_(lookup(HookPoint), Args).
%% @doc Run hooks with Accumulator.
-spec(run(atom(), list(Arg :: any()), any()) -> any()).
-spec(run(atom(), list(Arg::any()), Acc::any()) -> {ok, Acc::any()} | {stop, Acc::any()}).
run(HookPoint, Args, Acc) ->
run_(lookup(HookPoint), Args, Acc).
%% @private
run_([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
case filtered(Filter, Args) orelse execute(Action, Args) of
true -> run_(Callbacks, Args);
ok -> run_(Callbacks, Args);
stop -> stop;
_Any -> run_(Callbacks, Args)
case filter_passed(Filter, Args) andalso execute(Action, Args) of
false -> run_(Callbacks, Args);
ok -> run_(Callbacks, Args);
stop -> stop;
_Any -> run_(Callbacks, Args)
end;
run_([], _Args) ->
ok.
@ -105,8 +113,8 @@ run_([], _Args) ->
%% @private
run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
Args1 = Args ++ [Acc],
case filtered(Filter, Args1) orelse execute(Action, Args1) of
true -> run_(Callbacks, Args, Acc);
case filter_passed(Filter, Args1) andalso execute(Action, Args1) of
false -> run_(Callbacks, Args, Acc);
ok -> run_(Callbacks, Args, Acc);
{ok, NewAcc} -> run_(Callbacks, Args, NewAcc);
stop -> {stop, Acc};
@ -116,13 +124,14 @@ run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
run_([], _Args, Acc) ->
{ok, Acc}.
filtered(undefined, _Args) ->
false;
filtered(Filter, Args) ->
-spec(filter_passed(filter(), Args::term()) -> true | false).
filter_passed(undefined, _Args) -> true;
filter_passed(Filter, Args) ->
execute(Filter, Args).
execute(Action, Args) when is_function(Action) ->
erlang:apply(Action, Args);
%% @doc execute a function.
execute(Fun, Args) when is_function(Fun) ->
erlang:apply(Fun, Args);
execute({Fun, InitArgs}, Args) when is_function(Fun) ->
erlang:apply(Fun, Args ++ InitArgs);
execute({M, F, A}, Args) ->
@ -142,11 +151,11 @@ lookup(HookPoint) ->
%%------------------------------------------------------------------------------
init([]) ->
ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}, protected]),
{ok, #{}}.
handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->
Reply = case lists:keymember(Action, 2, Callbacks = lookup(HookPoint)) of
Reply = case lists:keymember(Action, #callback.action, Callbacks = lookup(HookPoint)) of
true ->
{error, already_exists};
false ->

View File

@ -607,27 +607,31 @@ deliver({connack, ReasonCode}, PState) ->
deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
proto_ver = ?MQTT_PROTO_V5,
client_id = ClientId,
conn_props = ConnProps,
is_assigned = IsAssigned,
topic_alias_maximum = TopicAliasMaximum}) ->
ResponseInformation = case maps:find('Request-Response-Information', ConnProps) of
{ok, 1} ->
iolist_to_binary(emqx_config:get_env(response_topic_prefix));
_ ->
<<>>
end,
#{max_packet_size := MaxPktSize,
max_qos_allowed := MaxQoS,
mqtt_retain_available := Retain,
max_topic_alias := MaxAlias,
mqtt_shared_subscription := Shared,
mqtt_wildcard_subscription := Wildcard} = caps(PState),
%% Response-Information is so far not set by broker.
%% i.e. It's a Client-to-Client contract for the request-response topic naming scheme.
%% According to MQTT 5.0 spec:
%% A common use of this is to pass a globally unique portion of the topic tree which
%% is reserved for this Client for at least the lifetime of its Session.
%% This often cannot just be a random name as both the requesting Client and the
%% responding Client need to be authorized to use it.
%% If we are to support it in the feature, the implementation should be flexible
%% to allow prefixing the response topic based on different ACL config.
%% e.g. prefix by username or client-id, so that unauthorized clients can not
%% subscribe requests or responses that are not intended for them.
Props = #{'Retain-Available' => flag(Retain),
'Maximum-Packet-Size' => MaxPktSize,
'Topic-Alias-Maximum' => MaxAlias,
'Wildcard-Subscription-Available' => flag(Wildcard),
'Subscription-Identifier-Available' => 1,
'Response-Information' => ResponseInformation,
%'Response-Information' =>
'Shared-Subscription-Available' => flag(Shared)},
Props1 = if

36
src/emqx_psk.erl Normal file
View File

@ -0,0 +1,36 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_psk).
-include("logger.hrl").
%% SSL PSK Callbacks
-export([lookup/3]).
-define(TAB, ?MODULE).
-type psk_identity() :: string().
-type psk_user_state() :: term().
-spec lookup(psk, psk_identity(), psk_user_state()) -> {ok, SharedSecret :: binary()} | error.
lookup(psk, ClientPSKID, UserState) ->
try emqx_hooks:run('tls_handshake.psk_lookup', [ClientPSKID], UserState) of
{ok, SharedSecret} -> {ok, SharedSecret};
{stop, SharedSecret} -> {ok, SharedSecret}
catch
Except:Error:Stacktrace ->
?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]),
error
end.

View File

@ -14,10 +14,13 @@
-module(emqx_topic).
-include("emqx_mqtt.hrl").
-export([match/2]).
-export([validate/1, validate/2]).
-export([levels/1]).
-export([triples/1]).
-export([tokens/1]).
-export([words/1]).
-export([wildcard/1]).
-export([join/1, prepend/2]).
@ -35,8 +38,6 @@
-define(MAX_TOPIC_LEN, 4096).
-include("emqx_mqtt.hrl").
%% @doc Is wildcard topic?
-spec(wildcard(topic() | words()) -> true | false).
wildcard(Topic) when is_binary(Topic) ->
@ -147,13 +148,19 @@ bin('#') -> <<"#">>;
bin(B) when is_binary(B) -> B;
bin(L) when is_list(L) -> list_to_binary(L).
-spec(levels(topic()) -> pos_integer()).
levels(Topic) when is_binary(Topic) ->
length(words(Topic)).
length(tokens(Topic)).
%% @doc Split topic to tokens.
-spec(tokens(topic()) -> list(binary())).
tokens(Topic) ->
binary:split(Topic, <<"/">>, [global]).
%% @doc Split Topic Path to Words
-spec(words(topic()) -> words()).
words(Topic) when is_binary(Topic) ->
[word(W) || W <- binary:split(Topic, <<"/">>, [global])].
[word(W) || W <- tokens(Topic)].
word(<<>>) -> '';
word(<<"+">>) -> '+';

View File

@ -30,7 +30,7 @@ init_per_suite(Config) ->
[start_apps(App, {SchemaFile, ConfigFile}) ||
{App, SchemaFile, ConfigFile}
<- [{emqx, local_path("priv/emqx.schema"),
local_path("etc/emqx.conf")}]],
local_path("etc/gen.emqx.conf")}]],
Config.
end_per_suite(_Config) ->
@ -85,12 +85,12 @@ t_alarm_handler(_) ->
Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>),
Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>),
SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0},
emqx_client_sock:send(Sock,
emqx_client_sock:send(Sock,
raw_send_serialize(
?SUBSCRIBE_PACKET(
1,
1,
[{Topic1, SubOpts},
{Topic2, SubOpts}]),
{Topic2, SubOpts}]),
#{version => ?MQTT_PROTO_V5})),
{ok, Data2} = gen_tcp:recv(Sock, 0),
@ -119,16 +119,16 @@ t_alarm_handler(_) ->
t_logger_handler(_) ->
%% Meck supervisor report
logger:log(error, #{label => {supervisor, start_error},
report => [{supervisor, {local, tmp_sup}},
{errorContext, shutdown},
{reason, reached_max_restart_intensity},
logger:log(error, #{label => {supervisor, start_error},
report => [{supervisor, {local, tmp_sup}},
{errorContext, shutdown},
{reason, reached_max_restart_intensity},
{offender, [{pid, meck},
{id, meck},
{mfargs, {meck, start_link, []}},
{restart_type, permanent},
{shutdown, 5000},
{child_type, worker}]}]},
{child_type, worker}]}]},
#{logger_formatter => #{title => "SUPERVISOR REPORT"},
report_cb => fun logger:format_otp_report/1}),
?assertEqual(true, lists:keymember(supervisor_report, 1, emqx_alarm_handler:get_alarms())).
@ -142,4 +142,3 @@ raw_send_serialize(Packet, Opts) ->
raw_recv_parse(P, ProtoVersion) ->
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
version => ProtoVersion}}).

View File

@ -32,8 +32,7 @@
<<"+/+">>, <<"TopicA/#">>]).
all() ->
[{group, mqttv4},
{group, mqttv5}].
[{group, mqttv4}].
groups() ->
[{mqttv4, [non_parallel_tests],
@ -44,10 +43,7 @@ groups() ->
%% keepalive_test,
redelivery_on_reconnect_test,
%% subscribe_failure_test,
dollar_topics_test]},
{mqttv5, [non_parallel_tests],
[request_response,
share_sub_request_topic]}].
dollar_topics_test]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
@ -56,89 +52,6 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
request_response_exception(QoS) ->
{ok, Client} = emqx_client:start_link([{proto_ver, v5},
{properties, #{ 'Request-Response-Information' => 0 }}]),
{ok, _} = emqx_client:connect(Client),
?assertError(no_response_information,
emqx_client:sub_request_topic(Client, QoS, <<"request_topic">>)),
ok = emqx_client:disconnect(Client).
request_response_per_qos(QoS) ->
{ok, Requester} = emqx_client:start_link([{proto_ver, v5},
{client_id, <<"requester">>},
{properties, #{ 'Request-Response-Information' => 1}}]),
{ok, _} = emqx_client:connect(Requester),
{ok, Responser} = emqx_client:start_link([{proto_ver, v5},
{client_id, <<"responser">>},
{properties, #{
'Request-Response-Information' => 1}},
{request_handler,
fun(_Req) -> <<"ResponseTest">> end}
]),
{ok, _} = emqx_client:connect(Responser),
ok = emqx_client:sub_request_topic(Responser, QoS, <<"request_topic">>),
{ok, <<"ResponseTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS),
ok = emqx_client:set_request_handler(Responser, fun(<<"request_payload">>) ->
<<"ResponseFunctionTest">>;
(_) ->
<<"404">>
end),
{ok, <<"ResponseFunctionTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS),
{ok, <<"404">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"invalid_request">>, QoS),
ok = emqx_client:disconnect(Responser),
ok = emqx_client:disconnect(Requester).
request_response(_Config) ->
request_response_per_qos(?QOS_2),
request_response_per_qos(?QOS_1),
request_response_per_qos(?QOS_0),
request_response_exception(?QOS_0),
request_response_exception(?QOS_1),
request_response_exception(?QOS_2).
share_sub_request_topic(_Config) ->
share_sub_request_topic_per_qos(?QOS_2),
share_sub_request_topic_per_qos(?QOS_1),
share_sub_request_topic_per_qos(?QOS_0).
share_sub_request_topic_per_qos(QoS) ->
application:set_env(?APPLICATION, shared_subscription_strategy, random),
ReqTopic = <<"request-topic">>,
RspTopic = <<"response-topic">>,
Group = <<"g1">>,
Properties = #{ 'Request-Response-Information' => 1},
Opts = fun(ClientId) -> [{proto_ver, v5},
{client_id, atom_to_binary(ClientId, utf8)},
{properties, Properties}
] end,
{ok, Requester} = emqx_client:start_link(Opts(requester)),
{ok, _} = emqx_client:connect(Requester),
{ok, Responser1} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]),
{ok, _} = emqx_client:connect(Responser1),
{ok, Responser2} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]),
{ok, _} = emqx_client:connect(Responser2),
ok = emqx_client:sub_request_topic(Responser1, QoS, ReqTopic, Group),
ok = emqx_client:sub_request_topic(Responser2, QoS, ReqTopic, Group),
%% Send a request, wait for response, validate response then return responser ID
ReqFun = fun(Req) ->
{ok, Rsp} = emqx_client:request(Requester, RspTopic, ReqTopic, Req, QoS),
case Rsp of
<<"1-", Req/binary>> -> 1;
<<"2-", Req/binary>> -> 2
end
end,
Ids = lists:map(fun(I) -> ReqFun(integer_to_binary(I)) end, lists:seq(1, 100)),
%% we are testing with random shared-dispatch strategy,
%% fail if not all responsers got a chance to handle requests
?assertEqual([1, 2], lists:usort(Ids)),
ok = emqx_client:disconnect(Responser1),
ok = emqx_client:disconnect(Responser2),
ok = emqx_client:disconnect(Requester).
receive_messages(Count) ->
receive_messages(Count, []).

View File

@ -34,17 +34,23 @@ add_delete_hook(_) ->
?assertEqual(Callbacks, emqx_hooks:lookup(test_hook)),
ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1),
ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun2/1),
timer:sleep(1000),
timer:sleep(200),
?assertEqual([], emqx_hooks:lookup(test_hook)),
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 8),
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun1, []}, 9),
Callbacks2 = [{callback, {?MODULE, hook_fun1, []}, undefined, 9},
{callback, {?MODULE, hook_fun2, []}, undefined, 8}],
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun8, []}, 8),
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 2),
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun10, []}, 10),
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun9, []}, 9),
Callbacks2 = [{callback, {?MODULE, hook_fun10, []}, undefined, 10},
{callback, {?MODULE, hook_fun9, []}, undefined, 9},
{callback, {?MODULE, hook_fun8, []}, undefined, 8},
{callback, {?MODULE, hook_fun2, []}, undefined, 2}],
?assertEqual(Callbacks2, emqx_hooks:lookup(emqx_hook)),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun1, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2}),
timer:sleep(1000),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun8, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun9, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun10, []}),
timer:sleep(200),
?assertEqual([], emqx_hooks:lookup(emqx_hook)),
ok = emqx_hooks:stop().
@ -67,16 +73,27 @@ run_hooks(_) ->
ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}),
{stop, []} = emqx:run_hooks(foldl_hook2, [arg], []),
ok = emqx:hook(filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0),
ok = emqx:run_hooks(filter1_hook, [arg]),
%% foreach hook always returns 'ok' or 'stop'
ok = emqx:hook(foreach_filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0),
?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg])), %% filter passed
?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg1])), %% filter failed
ok = emqx:hook(filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, []}),
{ok, []} = emqx:run_hooks(filter2_hook, [arg], []),
%% foldl hook always returns {'ok', Acc} or {'stop', Acc}
ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, [init_arg]}),
ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, {?MODULE, hook_filter2_1, [init_arg]}),
?assertEqual({ok, 3}, emqx:run_hooks(foldl_filter2_hook, [arg], 1)),
?assertEqual({ok, 2}, emqx:run_hooks(foldl_filter2_hook, [arg1], 1)),
ok = emqx_hooks:stop().
hook_fun1([]) -> ok.
hook_fun2([]) -> {ok, []}.
hook_fun1(arg) -> ok;
hook_fun1(_) -> stop.
hook_fun2(arg) -> ok;
hook_fun2(_) -> stop.
hook_fun2(_, Acc) -> {ok, Acc + 1}.
hook_fun2_1(_, Acc) -> {ok, Acc + 1}.
hook_fun3(arg1, arg2, _Acc, init) -> ok.
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}.
@ -89,6 +106,12 @@ hook_fun8(arg, initArg) -> stop.
hook_fun9(arg, _Acc) -> any.
hook_fun10(arg, _Acc) -> stop.
hook_filter1(arg) -> true.
hook_filter2(arg, _Acc) -> true.
hook_filter1(arg) -> true;
hook_filter1(_) -> false.
hook_filter2(arg, _Acc, init_arg) -> true;
hook_filter2(_, _Acc, _IntArg) -> false.
hook_filter2_1(arg, _Acc, init_arg) -> true;
hook_filter2_1(arg1, _Acc, init_arg) -> true;
hook_filter2_1(_, _Acc, _IntArg) -> false.

View File

@ -154,9 +154,10 @@ connect_v5(_) ->
#{'Request-Response-Information' => 1}})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
#{'Response-Information' := _RespInfo}), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5)
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5),
?assertNot(maps:is_key('Response-Information', Props)),
ok
end),
% topic alias = 0

View File

@ -0,0 +1,102 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% @doc This module implements a request handler based on emqx_client.
%% A request handler is a MQTT client which subscribes to a request topic,
%% processes the requests then send response to another topic which is
%% subscribed by the request sender.
%% This code is in test directory because request and response are pure
%% client-side behaviours.
-module(emqx_request_handler).
-export([start_link/4, stop/1]).
-include("emqx_client.hrl").
-type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos().
-type topic() :: emqx_topic:topic().
-type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()).
-spec start_link(topic(), qos(), handler(), emqx_client:options()) ->
{ok, pid()} | {error, any()}.
start_link(RequestTopic, QoS, RequestHandler, Options0) ->
Parent = self(),
MsgHandler = make_msg_handler(RequestHandler, Parent),
Options = [{msg_handler, MsgHandler} | Options0],
case emqx_client:start_link(Options) of
{ok, Pid} ->
{ok, _} = emqx_client:connect(Pid),
try subscribe(Pid, RequestTopic, QoS) of
ok -> {ok, Pid};
{error, _} = Error -> Error
catch
C : E : S ->
emqx_client:stop(Pid),
{error, {C, E, S}}
end;
{error, _} = Error -> Error
end.
stop(Pid) ->
emqx_client:disconnect(Pid).
make_msg_handler(RequestHandler, Parent) ->
#{publish => fun(Msg) -> handle_msg(Msg, RequestHandler, Parent) end,
puback => fun(_Ack) -> ok end,
disconnected => fun(_Reason) -> ok end
}.
handle_msg(ReqMsg, RequestHandler, Parent) ->
#{qos := QoS, properties := Props, payload := ReqPayload} = ReqMsg,
case maps:find('Response-Topic', Props) of
{ok, RspTopic} when RspTopic =/= <<>> ->
CorrData = maps:get('Correlation-Data', Props),
RspProps = maps:without(['Response-Topic'], Props),
RspPayload = RequestHandler(CorrData, ReqPayload),
RspMsg = #mqtt_msg{qos = QoS,
topic = RspTopic,
props = RspProps,
payload = RspPayload
},
emqx_logger:debug("~p sending response msg to topic ~s with~n"
"corr-data=~p~npayload=~p",
[?MODULE, RspTopic, CorrData, RspPayload]),
ok = send_response(RspMsg);
_ ->
Parent ! {discarded, ReqPayload},
ok
end.
send_response(Msg) ->
%% This function is evaluated by emqx_client itself.
%% hence delegate to another temp process for the loopback gen_statem call.
Client = self(),
_ = spawn_link(fun() ->
case emqx_client:publish(Client, Msg) of
ok -> ok;
{ok, _} -> ok;
{error, Reason} -> exit({failed_to_publish_response, Reason})
end
end),
ok.
subscribe(Client, Topic, QoS) ->
{ok, _Props, _QoS} =
emqx_client:subscribe(Client, [{Topic, [{rh, 2}, {rap, false},
{nl, true}, {qos, QoS}]}]),
ok.

View File

@ -0,0 +1,68 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_request_response_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]).
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
all() ->
[request_response].
request_response(_Config) ->
request_response_per_qos(?QOS_0),
request_response_per_qos(?QOS_1),
request_response_per_qos(?QOS_2).
request_response_per_qos(QoS) ->
ReqTopic = <<"request_topic">>,
RspTopic = <<"response_topic">>,
{ok, Requester} = emqx_request_sender:start_link(RspTopic, QoS,
[{proto_ver, v5},
{client_id, <<"requester">>},
{properties, #{ 'Request-Response-Information' => 1}}]),
%% This is a square service
Square = fun(_CorrData, ReqBin) ->
I = b2i(ReqBin),
i2b(I * I)
end,
{ok, Responser} = emqx_request_handler:start_link(ReqTopic, QoS, Square,
[{proto_ver, v5},
{client_id, <<"responser">>}
]),
ok = emqx_request_sender:send(Requester, ReqTopic, RspTopic, <<"corr-1">>, <<"2">>, QoS),
receive
{response, <<"corr-1">>, <<"4">>} ->
ok;
Other ->
erlang:error({unexpected, Other})
after
100 ->
erlang:error(timeout)
end,
ok = emqx_request_sender:stop(Requester),
ok = emqx_request_handler:stop(Responser).
b2i(B) -> binary_to_integer(B).
i2b(I) -> integer_to_binary(I).

View File

@ -0,0 +1,82 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% @doc This module implements a request sender based on emqx_client.
%% A request sender is a MQTT client which sends messages to a request
%% topic, and subscribes to another topic for responses.
%% This code is in test directory because request and response are pure
%% client-side behaviours.
-module(emqx_request_sender).
-export([start_link/3, stop/1, send/6]).
-include("emqx_client.hrl").
start_link(ResponseTopic, QoS, Options0) ->
Parent = self(),
MsgHandler = make_msg_handler(Parent),
Options = [{msg_handler, MsgHandler} | Options0],
case emqx_client:start_link(Options) of
{ok, Pid} ->
{ok, _} = emqx_client:connect(Pid),
try subscribe(Pid, ResponseTopic, QoS) of
ok -> {ok, Pid};
{error, _} = Error -> Error
catch
C : E : S ->
emqx_client:stop(Pid),
{error, {C, E, S}}
end;
{error, _} = Error -> Error
end.
%% @doc Send a message to request topic with correlation-data `CorrData'.
%% Response should be delivered as a `{response, CorrData, Payload}'
send(Client, ReqTopic, RspTopic, CorrData, Payload, QoS) ->
Props = #{'Response-Topic' => RspTopic,
'Correlation-Data' => CorrData
},
Msg = #mqtt_msg{qos = QoS,
topic = ReqTopic,
props = Props,
payload = Payload
},
case emqx_client:publish(Client, Msg) of
ok -> ok; %% QoS = 0
{ok, _} -> ok;
{error, _} = E -> E
end.
stop(Pid) ->
emqx_client:disconnect(Pid).
subscribe(Client, Topic, QoS) ->
case emqx_client:subscribe(Client, Topic, QoS) of
{ok, _, _} -> ok;
{error, _} = Error -> Error
end.
make_msg_handler(Parent) ->
#{publish => fun(Msg) -> handle_msg(Msg, Parent) end,
puback => fun(_Ack) -> ok end,
disconnected => fun(_Reason) -> ok end
}.
handle_msg(Msg, Parent) ->
#{properties := Props, payload := Payload} = Msg,
CorrData = maps:get('Correlation-Data', Props),
Parent ! {response, CorrData, Payload},
ok.

View File

@ -20,8 +20,17 @@
-compile(export_all).
-compile(nowarn_export_all).
-import(emqx_topic, [wildcard/1, match/2, validate/1, triples/1, join/1,
words/1, systop/1, feed_var/3, parse/1]).
-import(emqx_topic,
[wildcard/1,
match/2,
validate/1,
triples/1,
join/1,
words/1,
systop/1,
feed_var/3,
parse/1
]).
-define(N, 10000).
@ -32,6 +41,7 @@ all() ->
t_triples,
t_join,
t_levels,
t_tokens,
t_words,
t_systop,
t_feed_var,
@ -165,6 +175,9 @@ t_triples_perf(_) ->
t_levels(_) ->
?assertEqual(4, emqx_topic:levels(<<"a/b/c/d">>)).
t_tokens(_) ->
?assertEqual([<<"a">>, <<"b">>, <<"+">>, <<"#">>], emqx_topic:tokens(<<"a/b/+/#">>)).
t_words(_) ->
['', <<"a">>, '+', '#'] = words(<<"/a/+/#">>),
['', <<"abkc">>, <<"19383">>, '+', <<"akakdkkdkak">>, '#'] = words(<<"/abkc/19383/+/akakdkkdkak/#">>),
@ -193,9 +206,12 @@ t_systop(_) ->
?assertEqual(SysTop2,systop(<<"abc">>)).
t_feed_var(_) ->
?assertEqual(<<"$queue/client/clientId">>, feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)),
?assertEqual(<<"username/test/client/x">>, feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>)),
?assertEqual(<<"username/test/client/clientId">>, feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>)).
?assertEqual(<<"$queue/client/clientId">>,
feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)),
?assertEqual(<<"username/test/client/x">>,
feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>)),
?assertEqual(<<"username/test/client/clientId">>,
feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>)).
long_topic() ->
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]).
@ -208,3 +224,4 @@ t_parse(_) ->
?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),
?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).