Merge pull request #1859 from emqx/emqx30

Release v3.0-beta.3 for windows
This commit is contained in:
huangdan 2018-09-22 20:11:01 +08:00 committed by GitHub
commit ddc2c0cdeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1203 additions and 450 deletions

2
.gitignore vendored
View File

@ -35,3 +35,5 @@ bbmustache/
etc/gen.emqx.conf
compile_commands.json
cuttlefish
rebar.lock
xrefr

View File

@ -8,8 +8,10 @@ before_install:
script:
- make dep-vsn-check
- make rebar-compile
- make rebar-eunit
- make rebar-ct
- make rebar-cover
- make coveralls
sudo: false

View File

@ -10,7 +10,7 @@ dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0
dep_gproc = git https://github.com/uwiger/gproc 0.8.0
dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0
dep_lager = git https://github.com/erlang-lager/lager 3.6.5
dep_esockd = git https://github.com/emqx/esockd v5.4
dep_esockd = git https://github.com/emqx/esockd v5.4.1
dep_ekka = git https://github.com/emqx/ekka v0.4.1
dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0
dep_clique = git https://github.com/emqx/clique develop
@ -18,7 +18,7 @@ dep_emqx_passwd = git https://github.com/emqx/emqx-passwd win30
NO_AUTOPATCH = cuttlefish
ERLC_OPTS += +debug_info
ERLC_OPTS += +debug_info -DAPPLICATION=emqx
ERLC_OPTS += +'{parse_transform, lager_transform}'
BUILD_DEPS = cuttlefish
@ -27,7 +27,7 @@ dep_cuttlefish = git https://github.com/emqx/cuttlefish emqx30
#TEST_DEPS = emqx_ct_helplers
#dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers
TEST_ERLC_OPTS += +debug_info
TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx
TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
EUNIT_OPTS = verbose
@ -39,7 +39,7 @@ CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access
emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \
emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \
emqx_mountpoint emqx_listeners emqx_protocol
emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub
CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
@ -60,7 +60,7 @@ gen-clean:
@rm -f etc/gen.emqx.conf
bbmustache:
$(verbose) git clone https://github.com/soranoba/bbmustache.git && pushd bbmustache && ./rebar3 compile && popd
$(verbose) git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd ..
# This hack is to generate a conf file for testing
# relx overlay is used for release
@ -78,26 +78,29 @@ app.config: etc/gen.emqx.conf
ct: cuttlefish app.config
rebar-cover:
@rebar3 cover
coveralls:
@rebar3 coveralls send
cuttlefish: deps
@mv ./deps/cuttlefish/cuttlefish ./cuttlefish
rebar-cuttlefish: rebar-deps
@make -C _build/default/lib/cuttlefish
@mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish
cuttlefish: rebar-deps
@if [ ! -f cuttlefish ]; then \
make -C _build/default/lib/cuttlefish; \
mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish; \
fi
rebar-deps:
@rebar3 get-deps
rebar-eunit:
rebar-eunit: cuttlefish
@rebar3 eunit
rebar-compile:
@rebar3 compile
rebar-ct: rebar-cuttlefish app.config
rebar-ct: cuttlefish app.config
@rebar3 as test compile
@ln -s -f '../../../../etc' _build/test/lib/emqx/
@ln -s -f '../../../../data' _build/test/lib/emqx/

View File

@ -515,25 +515,31 @@ zone.external.idle_timeout = 15s
## Publish limit for the external MQTT connections.
##
## Value: rate,burst
## Default: 10 messages per second, and 100 messages burst.
## zone.external.publish_limit = 10,100
## Enable ban check.
##
## Value: Flag
zone.external.enable_ban = on
## Value: Number,Duration
## Example: 10 messages per minute.
## zone.external.publish_limit = 10,1m
## Enable ACL check.
##
## Value: Flag
zone.external.enable_acl = on
## Enable ban check.
##
## Value: Flag
zone.external.enable_ban = on
## Enable per connection statistics.
##
## Value: on | off
zone.external.enable_stats = on
## Force MQTT connection/session process GC after this number of
## messages | bytes passed through.
##
## Numbers delimited by `|'. Zero or negative is to disable.
zone.external.force_gc_policy = 1000|1MB
## Maximum MQTT packet size allowed.
##
## Value: Bytes
@ -750,7 +756,7 @@ listener.tcp.external.zone = external
##
## Value: rate,burst
## Unit: Bps
## listener.tcp.external.rate_limit = 1024,4096
listener.tcp.external.rate_limit = 1024,4096
## The access control rules for the MQTT/TCP listener.
##
@ -1197,6 +1203,11 @@ listener.ssl.external.reuseaddr = true
## Examples: 8083, 127.0.0.1:8083, ::1:8083
listener.ws.external = 8083
## The path of WebSocket MQTT endpoint
##
## Value: URL Path
listener.ws.external.mqtt_path = /mqtt
## The acceptor pool for external MQTT/WebSocket listener.
##
## Value: Number
@ -1336,6 +1347,11 @@ listener.ws.external.nodelay = true
## Examples: 8084, 127.0.0.1:8084, ::1:8084
listener.wss.external = 8084
## The path of WebSocket MQTT endpoint
##
## Value: URL Path
listener.wss.external.mqtt_path = /mqtt
## The acceptor pool for external MQTT/WebSocket/SSL listener.
##
## Value: Number
@ -1531,241 +1547,257 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
##--------------------------------------------------------------------
##--------------------------------------------------------------------
## Bridges to edge
## Bridges to aws
##--------------------------------------------------------------------
## Bridge type.
##
## Value: Enum
## Example: out | in
bridge.edge.type = in
## Bridge address: node name for local bridge, host:port for remote.
##
## Value: String
## Example: emqx@127.0.0.1, 127.0.0.1:1883
bridge.edge.address = 127.0.0.1:1883
## Protocol version of the bridge.
##
## Value: Enum
## - mqtt5
## - mqtt4
## - mqtt3
bridge.edge.proto_ver = mqtt4
## The ClientId of a remote bridge.
##
## Value: String
bridge.edge.client_id = bridge_edge
## The Clean start flag of a remote bridge.
##
## Value: boolean
bridge.edge.clean_start = false
## The username for a remote bridge.
##
## Value: String
bridge.edge.username = user
## The password for a remote bridge.
##
## Value: String
bridge.edge.password = passwd
## Mountpoint of the bridge.
##
## Value: String
## bridge.edge.mountpoint = bridge/edge/
## Ping interval of a down bridge.
##
## Value: Duration
## Default: 10 seconds
bridge.edge.keepalive = 10s
## Subscriptions of the bridge topic.
##
## Value: String
bridge.edge.subscription.1.topic = #
## Subscriptions of the bridge qos.
##
## Value: Number
bridge.edge.subscription.1.qos = 1
## The pending message queue of a bridge.
##
## Value: Number
bridge.edge.max_pending_messages = 10000
## Start type of the bridge.
##
## Value: enum
## manual
## auto
bridge.edge.start_type = manual
## Bridge reconnect count.
##
## Value: Number
bridge.edge.reconnect_count = 10
bridge.aws.start_type = manual
## Bridge reconnect time.
##
## Value: Duration
## Default: 30 seconds
bridge.edge.reconnect_time = 30s
## PEM-encoded CA certificates of the bridge.
##
## Value: File
## bridge.edge.cacertfile = cacert.pem
## SSL Certfile of the bridge.
##
## Value: File
## bridge.edge.certfile = cert.pem
## SSL Keyfile of the bridge.
##
## Value: File
## bridge.edge.keyfile = key.pem
## SSL Ciphers used by the bridge.
##
## Value: String
## bridge.edge.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## TLS versions used by the bridge.
##
## Value: String
## bridge.edge.tls_versions = tlsv1.2,tlsv1.1,tlsv1
##--------------------------------------------------------------------
## Bridges to cloud
##--------------------------------------------------------------------
## Bridge type.
##
## Value: Enum
## Example: out | in
bridge.cloud.type = out
bridge.aws.reconnect_interval = 30s
## Bridge address: node name for local bridge, host:port for remote.
##
## Value: String
## Example: emqx@127.0.0.1, 127.0.0.1:1883
bridge.cloud.address = 127.0.0.1:1883
bridge.aws.address = 127.0.0.1:1883
## Protocol version of the bridge.
##
## Value: Enum
## - mqtt5
## - mqtt4
## - mqtt3
bridge.cloud.proto_ver = mqtt4
## - mqttv5
## - mqttv4
## - mqttv3
bridge.aws.proto_ver = mqttv4
## The ClientId of a remote bridge.
##
## Value: String
bridge.cloud.client_id = bridge_cloud
bridge.aws.client_id = bridge_aws
## The Clean start flag of a remote bridge.
##
## Value: boolean
bridge.cloud.clean_start = false
bridge.aws.clean_start = false
## The username for a remote bridge.
##
## Value: String
bridge.cloud.username = user
bridge.aws.username = user
## The password for a remote bridge.
##
## Value: String
bridge.cloud.password = passwd
bridge.aws.password = passwd
## Mountpoint of the bridge.
##
## Value: String
bridge.cloud.mountpoint = bridge/edge/${node}/
bridge.aws.mountpoint = bridge/aws/${node}/
## Ping interval of a down bridge.
##
## Value: Duration
## Default: 10 seconds
bridge.cloud.keepalive = 10s
bridge.aws.keepalive = 60s
## Forward message topics
##
## Value: String
## Example: topic1/#,topic2/#
bridge.cloud.forward_rule = #
bridge.aws.forwards = topic1/#,topic2/#
## Subscriptions of the bridge topic.
##
## Value: String
bridge.cloud.subscription.1.topic = $share/cmd/topic1
bridge.aws.subscription.1.topic = cmd/topic1
## Subscriptions of the bridge qos.
##
## Value: Number
bridge.cloud.subscription.1.qos = 1
bridge.aws.subscription.1.qos = 1
## Bridge store message type.
## Subscriptions of the bridge topic.
##
## Value: String
bridge.aws.subscription.2.topic = cmd/topic2
## Subscriptions of the bridge qos.
##
## Value: Number
bridge.aws.subscription.2.qos = 1
## Bridge message queue message type.
##
## Value: Enum
## Example: memory | disk
bridge.cloud.store_type = memory
bridge.aws.mqueue_type = memory
## The pending message queue of a bridge.
##
## Value: Number
bridge.cloud.max_pending_messages = 10000
bridge.aws.max_pending_messages = 10000
## PEM-encoded CA certificates of the bridge.
##
## Value: File
## bridge.aws.cacertfile = cacert.pem
## SSL Certfile of the bridge.
##
## Value: File
## bridge.aws.certfile = cert.pem
## SSL Keyfile of the bridge.
##
## Value: File
## bridge.aws.keyfile = key.pem
## SSL Ciphers used by the bridge.
##
## Value: String
## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## TLS versions used by the bridge.
##
## Value: String
## bridge.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1
##--------------------------------------------------------------------
## Bridges to azure
##--------------------------------------------------------------------
## Start type of the bridge.
##
## Value: enum
## manual
## auto
bridge.cloud.start_type = manual
## bridge.azure.start_type = manual
## Bridge reconnect count.
##
## Value: Number
bridge.cloud.reconnect_count = 10
## bridge.azure.reconnect_count = 10
## Bridge reconnect time.
##
## Value: Duration
## Default: 30 seconds
bridge.cloud.reconnect_time = 30s
## bridge.azure.reconnect_time = 30s
## Bridge address: node name for local bridge, host:port for remote.
##
## Value: String
## Example: emqx@127.0.0.1, 127.0.0.1:1883
## bridge.azure.address = 127.0.0.1:1883
## Protocol version of the bridge.
##
## Value: Enum
## - mqttv5
## - mqttv4
## - mqttv3
## bridge.azure.proto_ver = mqttv4
## The ClientId of a remote bridge.
##
## Value: String
## bridge.azure.client_id = bridge_azure
## The Clean start flag of a remote bridge.
##
## Value: boolean
## bridge.azure.clean_start = false
## The username for a remote bridge.
##
## Value: String
## bridge.azure.username = user
## The password for a remote bridge.
##
## Value: String
## bridge.azure.password = passwd
## Mountpoint of the bridge.
##
## Value: String
## bridge.azure.mountpoint = bridge/azure/${node}/
## Ping interval of a down bridge.
##
## Value: Duration
## Default: 10 seconds
## bridge.azure.keepalive = 10s
## Forward message topics
##
## Value: String
## Example: topic1/#,topic2/#
## bridge.azure.forwards = topic1/#,topic2/#
## Subscriptions of the bridge topic.
##
## Value: String
## bridge.azure.subscription.1.topic = $share/cmd/topic1
## Subscriptions of the bridge qos.
##
## Value: Number
## bridge.azure.subscription.1.qos = 1
## Subscriptions of the bridge topic.
##
## Value: String
## bridge.azure.subscription.2.topic = $share/cmd/topic2
## Subscriptions of the bridge qos.
##
## Value: Number
## bridge.azure.subscription.2.qos = 1
## Bridge store message type.
##
## Value: Enum
## Example: memory | disk
## bridge.azure.store_type = memory
## The pending message queue of a bridge.
##
## Value: Number
## bridge.azure.max_pending_messages = 10000
## PEM-encoded CA certificates of the bridge.
##
## Value: File
## bridge.cloud.cacertfile = cacert.pem
## bridge.azure.cacertfile = cacert.pem
## SSL Certfile of the bridge.
##
## Value: File
## bridge.cloud.certfile = cert.pem
## bridge.azure.certfile = cert.pem
## SSL Keyfile of the bridge.
##
## Value: File
## bridge.cloud.keyfile = key.pem
## bridge.azure.keyfile = key.pem
## SSL Ciphers used by the bridge.
##
## Value: String
## bridge.cloud.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## TLS versions used by the bridge.
##
## Value: String
## bridge.cloud.tls_versions = tlsv1.2,tlsv1.1,tlsv1
## bridge.azure.tls_versions = tlsv1.2,tlsv1.1,tlsv1
##--------------------------------------------------------------------
## Modules

View File

@ -649,14 +649,14 @@ end}.
{datatype, {enum, [allow, deny]}}
]}.
%% @doc Enable Ban.
{mapping, "zone.$name.enable_ban", "emqx.zones", [
%% @doc Enable ACL check.
{mapping, "zone.$name.enable_acl", "emqx.zones", [
{default, off},
{datatype, flag}
]}.
%% @doc Enable ACL check.
{mapping, "zone.$name.enable_acl", "emqx.zones", [
%% @doc Enable Ban.
{mapping, "zone.$name.enable_ban", "emqx.zones", [
{default, off},
{datatype, flag}
]}.
@ -669,7 +669,6 @@ end}.
%% @doc Publish limit of the MQTT connections.
{mapping, "zone.$name.publish_limit", "emqx.zones", [
{default, undefined},
{datatype, string}
]}.
@ -770,7 +769,7 @@ end}.
%% @doc Session Expiry Interval
{mapping, "zone.$name.session_expiry_interval", "emqx.zones", [
{default, "2h"},
{datatype, {duration, ms}}
{datatype, {duration, s}}
]}.
%% @doc Type: simple | priority
@ -797,6 +796,25 @@ end}.
{datatype, {enum, [true, false]}}
]}.
%% @doc Force connection/session process GC after this number of
%% messages | bytes passed through.
%% Numbers delimited by `|'. Zero or negative is to disable.
{mapping, "zone.$name.force_gc_policy", "emqx.zones", [
{default, "0 | 0MB"},
{datatype, string}
]}.
%% @doc Max message queue length and total heap size to force shutdown
%% connection/session process.
%% Message queue here is the Erlang process mailbox, but not the number
%% of queued MQTT messages of QoS 1 and 2.
%% Total heap size is the in Erlang 'words' not in 'bytes'.
%% Zero or negative is to disable.
{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [
{default, "0 | 0MB"},
{datatype, string}
]}.
{translation, "emqx.zones", fun(Conf) ->
Mapping = fun("retain_available", Val) ->
{mqtt_retain_available, Val};
@ -804,6 +822,34 @@ end}.
{mqtt_wildcard_subscription, Val};
("shared_subscription", Val) ->
{mqtt_shared_subscription, Val};
("publish_limit", Val) ->
[Limit, Duration] = string:tokens(Val, ", "),
PubLimit = case cuttlefish_duration:parse(Duration, s) of
Secs when is_integer(Secs) ->
{list_to_integer(Limit) / Secs, list_to_integer(Limit)};
{error, Reason} ->
error(Reason)
end,
{publish_limit, PubLimit};
("force_gc_policy", Val) ->
[Count, Bytes] = string:tokens(Val, "| "),
GcPolicy = case cuttlefish_bytesize:parse(Bytes) of
{error, Reason} ->
error(Reason);
Bytes1 ->
#{bytes => Bytes1, count => list_to_integer(Count)}
end,
{force_gc_policy, GcPolicy};
("force_shutdown_policy", Val) ->
[Len, Siz] = string:tokens(Val, "| "),
ShutdownPolicy = case cuttlefish_bytesize:parse(Siz) of
{error, Reason} ->
error(Reason);
Siz1 ->
#{message_queue_len => list_to_integer(Len),
total_heap_size => Siz1}
end,
{force_shutdown_policy, ShutdownPolicy};
(Opt, Val) ->
{list_to_atom(Opt), Val}
end,
@ -1067,6 +1113,11 @@ end}.
{datatype, [integer, ip]}
]}.
{mapping, "listener.ws.$name.mqtt_path", "emqx.listeners", [
{default, "/mqtt"},
{datatype, string}
]}.
{mapping, "listener.ws.$name.acceptors", "emqx.listeners", [
{default, 8},
{datatype, integer}
@ -1168,6 +1219,11 @@ end}.
{datatype, [integer, ip]}
]}.
{mapping, "listener.wss.$name.mqtt_path", "emqx.listeners", [
{default, "/mqtt"},
{datatype, string}
]}.
{mapping, "listener.wss.$name.acceptors", "emqx.listeners", [
{default, 8},
{datatype, integer}
@ -1339,6 +1395,7 @@ end}.
LisOpts = fun(Prefix) ->
Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
{mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)},
{max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)},
{max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)},
{tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)},
@ -1416,12 +1473,7 @@ end}.
%%--------------------------------------------------------------------
%% Bridges
%%--------------------------------------------------------------------
{mapping, "bridge.$name.type", "emqx.bridges", [
{datatype, {enum, [in, out]}}
]}.
{mapping, "bridge.$name.store_type", "emqx.bridges", [
{mapping, "bridge.$name.mqueue_type", "emqx.bridges", [
{datatype, {enum, [memory, disk]}}
]}.
@ -1430,7 +1482,7 @@ end}.
]}.
{mapping, "bridge.$name.proto_ver", "emqx.bridges", [
{datatype, {enum, [mqtt3, mqtt4, mqtt5]}}
{datatype, {enum, [mqttv3, mqttv4, mqttv5]}}
]}.
{mapping, "bridge.$name.client_id", "emqx.bridges", [
@ -1454,7 +1506,7 @@ end}.
{datatype, string}
]}.
{mapping, "bridge.$name.forward_rule", "emqx.bridges", [
{mapping, "bridge.$name.forwards", "emqx.bridges", [
{datatype, string}
]}.
@ -1501,12 +1553,7 @@ end}.
{default, auto}
]}.
{mapping, "bridge.$name.reconnect_count", "emqx.bridges", [
{default, 10},
{datatype, integer}
]}.
{mapping, "bridge.$name.reconnect_time", "emqx.bridges", [
{mapping, "bridge.$name.reconnect_interval", "emqx.bridges", [
{default, "30s"},
{datatype, {duration, s}}
]}.
@ -1660,9 +1707,16 @@ end}.
{datatype, {enum, [local,one,quorum,all]}}
]}.
%% @doc Shared Subscription Dispatch Strategy.
{mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [
{default, random},
{datatype, {enum, [random, round_robbin, hash]}}
{default, round_robbin},
{datatype,
{enum,
[random, %% randomly pick a subscriber
round_robbin, %% round robin alive subscribers one message after another
sticky, %% pick a random subscriber and stick to it
hash %% hash client ID to a group member
]}}
]}.
{mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [
@ -1712,3 +1766,4 @@ end}.
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
end}.

View File

@ -20,7 +20,8 @@
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform, lager_transform}]}.
{parse_transform, lager_transform},
{d, 'APPLICATION', emqx}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
@ -29,10 +30,5 @@
{cover_export_enabled, true}.
%% rebar3_neotoma_plugin is needed to compile the .peg file for cuttlefish
{plugins, [rebar3_neotoma_plugin]}.
{plugins, [coveralls, rebar3_neotoma_plugin]}.
%% Do not include cuttlefish's dependencies as mine
%% its dependencies are only fetched to compile itself
%% they are however not needed by emqx
{overrides, [{override, cuttlefish, [{deps, []}]}
]}.

View File

@ -3,7 +3,6 @@ CONFIG1 = case os:getenv("TRAVIS") of
"true" ->
JobId = os:getenv("TRAVIS_JOB_ID"),
[{coveralls_service_job_id, JobId},
{plugins, [coveralls]},
{coveralls_coverdata, "_build/test/cover/*.coverdata"},
{coveralls_service_name , "travis-ci"} | CONFIG];
_ ->

View File

@ -23,12 +23,16 @@
-export([start_link/2, start_bridge/1, stop_bridge/1, status/1]).
-export([show_forwards/1, add_forward/2, del_forward/2]).
-export([show_subscriptions/1, add_subscription/3, del_subscription/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {client_pid, options, reconnect_time, reconnect_count,
def_reconnect_count, type, mountpoint, queue, store_type,
max_pending_messages}).
-record(state, {client_pid, options, reconnect_interval,
mountpoint, queue, mqueue_type, max_pending_messages,
forwards = [], subscriptions = []}).
-record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false,
packet_id, topic, props, payload}).
@ -42,6 +46,50 @@ start_bridge(Name) ->
stop_bridge(Name) ->
gen_server:call(name(Name), stop_bridge).
-spec(show_forwards(atom()) -> list()).
show_forwards(Name) ->
gen_server:call(name(Name), show_forwards).
-spec(add_forward(atom(), binary()) -> ok | {error, already_exists | validate_fail}).
add_forward(Name, Topic) ->
case catch emqx_topic:validate({filter, Topic}) of
true ->
gen_server:call(name(Name), {add_forward, Topic});
{'EXIT', _Reason} ->
{error, validate_fail}
end.
-spec(del_forward(atom(), binary()) -> ok | {error, validate_fail}).
del_forward(Name, Topic) ->
case catch emqx_topic:validate({filter, Topic}) of
true ->
gen_server:call(name(Name), {del_forward, Topic});
_ ->
{error, validate_fail}
end.
-spec(show_subscriptions(atom()) -> list()).
show_subscriptions(Name) ->
gen_server:call(name(Name), show_subscriptions).
-spec(add_subscription(atom(), binary(), integer()) -> ok | {error, already_exists | validate_fail}).
add_subscription(Name, Topic, QoS) ->
case catch emqx_topic:validate({filter, Topic}) of
true ->
gen_server:call(name(Name), {add_subscription, Topic, QoS});
{'EXIT', _Reason} ->
{error, validate_fail}
end.
-spec(del_subscription(atom(), binary()) -> ok | {error, validate_fail}).
del_subscription(Name, Topic) ->
case catch emqx_topic:validate({filter, Topic}) of
true ->
gen_server:call(name(Name), {del_subscription, Topic});
_ ->
{error, validate_fail}
end.
status(Pid) ->
gen_server:call(Pid, status).
@ -55,41 +103,78 @@ init([Options]) ->
manual -> ok;
auto -> erlang:send_after(1000, self(), start)
end,
ReconnectCount = get_value(reconnect_count, Options, 10),
ReconnectTime = get_value(reconnect_time, Options, 30000),
ReconnectInterval = get_value(reconnect_interval, Options, 30000),
MaxPendingMsg = get_value(max_pending_messages, Options, 10000),
Mountpoint = format_mountpoint(get_value(mountpoint, Options)),
StoreType = get_value(store_type, Options, memory),
Type = get_value(type, Options, in),
MqueueType = get_value(mqueue_type, Options, memory),
Queue = [],
{ok, #state{type = Type,
mountpoint = Mountpoint,
{ok, #state{mountpoint = Mountpoint,
queue = Queue,
store_type = StoreType,
mqueue_type = MqueueType,
options = Options,
reconnect_count = ReconnectCount,
reconnect_time = ReconnectTime,
def_reconnect_count = ReconnectCount,
reconnect_interval = ReconnectInterval,
max_pending_messages = MaxPendingMsg}}.
handle_call(start_bridge, _From, State = #state{client_pid = undefined}) ->
{noreply, NewState} = handle_info(start, State),
{reply, <<"start bridge successfully">>, NewState};
{reply, #{msg => <<"start bridge successfully">>}, NewState};
handle_call(start_bridge, _From, State) ->
{reply, <<"bridge already started">>, State};
{reply, #{msg => <<"bridge already started">>}, State};
handle_call(stop_bridge, _From, State = #state{client_pid = undefined}) ->
{reply, <<"bridge not started">>, State};
{reply, #{msg => <<"bridge not started">>}, State};
handle_call(stop_bridge, _From, State = #state{client_pid = Pid}) ->
emqx_client:disconnect(Pid),
{reply, <<"stop bridge successfully">>, State};
{reply, #{msg => <<"stop bridge successfully">>}, State};
handle_call(status, _From, State = #state{client_pid = undefined}) ->
{reply, <<"Stopped">>, State};
{reply, #{status => <<"Stopped">>}, State};
handle_call(status, _From, State = #state{client_pid = _Pid})->
{reply, <<"Running">>, State};
{reply, #{status => <<"Running">>}, State};
handle_call(show_forwards, _From, State = #state{forwards = Forwards}) ->
{reply, Forwards, State};
handle_call({add_forward, Topic}, _From, State = #state{forwards = Forwards}) ->
case not lists:member(Topic, Forwards) of
true ->
emqx_broker:subscribe(Topic),
{reply, ok, State#state{forwards = [Topic | Forwards]}};
false ->
{reply, {error, already_exists}, State}
end;
handle_call({del_forward, Topic}, _From, State = #state{forwards = Forwards}) ->
case lists:member(Topic, Forwards) of
true ->
emqx_broker:unsubscribe(Topic),
{reply, ok, State#state{forwards = lists:delete(Topic, Forwards)}};
false ->
{reply, ok, State}
end;
handle_call(show_subscriptions, _From, State = #state{subscriptions = Subscriptions}) ->
{reply, Subscriptions, State};
handle_call({add_subscription, Topic, Qos}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) ->
case not lists:keymember(Topic, 1, Subscriptions) of
true ->
emqx_client:subscribe(ClientPid, {Topic, Qos}),
{reply, ok, State#state{subscriptions = [{Topic, Qos} | Subscriptions]}};
false ->
{reply, {error, already_exists}, State}
end;
handle_call({del_subscription, Topic}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) ->
case lists:keymember(Topic, 1, Subscriptions) of
true ->
emqx_client:unsubscribe(ClientPid, Topic),
{reply, ok, State#state{subscriptions = lists:keydelete(Topic, 1, Subscriptions)}};
false ->
{reply, ok, State}
end;
handle_call(Req, _From, State) ->
emqx_logger:error("[Bridge] unexpected call: ~p", [Req]),
@ -99,46 +184,24 @@ handle_cast(Msg, State) ->
emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(start, State = #state{reconnect_count = 0}) ->
{noreply, State};
%%----------------------------------------------------------------
%% start in message bridge
%% start message bridge
%%----------------------------------------------------------------
handle_info(start, State = #state{options = Options,
client_pid = undefined,
reconnect_time = ReconnectTime,
reconnect_count = ReconnectCount,
type = in}) ->
reconnect_interval = ReconnectInterval}) ->
case emqx_client:start_link([{owner, self()}|options(Options)]) of
{ok, ClientPid, _} ->
Subs = get_value(subscriptions, Options, []),
[emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs],
{noreply, State#state{client_pid = ClientPid}};
{error,_} ->
erlang:send_after(ReconnectTime, self(), start),
{noreply, State#state{reconnect_count = ReconnectCount-1}}
end;
%%----------------------------------------------------------------
%% start out message bridge
%%----------------------------------------------------------------
handle_info(start, State = #state{options = Options,
client_pid = undefined,
reconnect_time = ReconnectTime,
reconnect_count = ReconnectCount,
type = out}) ->
case emqx_client:start_link([{owner, self()}|options(Options)]) of
{ok, ClientPid, _} ->
Subs = get_value(subscriptions, Options, []),
[emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs],
ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","),
[emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules,
Subs = [{i2b(Topic), Qos} || {Topic, Qos} <- get_value(subscriptions, Options, []),
emqx_topic:validate({filter, i2b(Topic)})],
{noreply, State#state{client_pid = ClientPid}};
Forwards = [i2b(Topic) || Topic <- string:tokens(get_value(forwards, Options, ""), ","),
emqx_topic:validate({filter, i2b(Topic)})],
[emqx_client:subscribe(ClientPid, {Topic, Qos}) || {Topic, Qos} <- Subs],
[emqx_broker:subscribe(Topic) || Topic <- Forwards],
{noreply, State#state{client_pid = ClientPid, subscriptions = Subs, forwards = Forwards}};
{error,_} ->
erlang:send_after(ReconnectTime, self(), start),
{noreply, State#state{reconnect_count = ReconnectCount-1}}
erlang:send_after(ReconnectInterval, self(), start),
{noreply, State}
end;
%%----------------------------------------------------------------
@ -146,14 +209,14 @@ handle_info(start, State = #state{options = Options,
%%----------------------------------------------------------------
handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}},
State = #state{client_pid = Pid, mountpoint = Mountpoint, queue = Queue,
store_type = StoreType, max_pending_messages = MaxPendingMsg}) ->
mqueue_type = MqueueType, max_pending_messages = MaxPendingMsg}) ->
Msg = #mqtt_msg{qos = 1,
retain = Retain,
topic = mountpoint(Mountpoint, Topic),
payload = Payload},
case emqx_client:publish(Pid, Msg) of
{ok, PkgId} ->
{noreply, State#state{queue = store(StoreType, {PkgId, Msg}, Queue, MaxPendingMsg)}};
{noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}};
{error, Reason} ->
emqx_logger:error("Publish fail:~p", [Reason]),
{noreply, State}
@ -172,19 +235,18 @@ handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic
%%----------------------------------------------------------------
%% received remote puback message
%%----------------------------------------------------------------
handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, store_type = StoreType}) ->
handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueue_type = MqueueType}) ->
% lists:keydelete(PkgId, 1, Queue)
{noreply, State#state{queue = delete(StoreType, PkgId, Queue)}};
{noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}};
handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) ->
{noreply, State#state{client_pid = undefined}};
handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid,
reconnect_time = ReconnectTime,
def_reconnect_count = DefReconnectCount}) ->
reconnect_interval = ReconnectInterval}) ->
lager:warning("emqx bridge stop reason:~p", [Reason]),
erlang:send_after(ReconnectTime, self(), start),
{noreply, State#state{client_pid = undefined, reconnect_count = DefReconnectCount}};
erlang:send_after(ReconnectInterval, self(), start),
{noreply, State#state{client_pid = undefined}};
handle_info(Info, State) ->
emqx_logger:error("[Bridge] unexpected info: ~p", [Info]),
@ -196,9 +258,9 @@ terminate(_Reason, #state{}) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
proto_ver(mqtt3) -> v3;
proto_ver(mqtt4) -> v4;
proto_ver(mqtt5) -> v5.
proto_ver(mqttv3) -> v3;
proto_ver(mqttv4) -> v4;
proto_ver(mqttv5) -> v5.
address(Address) ->
case string:tokens(Address, ":") of
[Host] -> {Host, 1883};

View File

@ -27,7 +27,7 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc List all bridges
-spec(bridges() -> [{node(), Status :: binary()}]).
-spec(bridges() -> [{node(), map()}]).
bridges() ->
[{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)].

View File

@ -70,7 +70,7 @@ subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
-spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid(),
emqx_types:subid() | emqx_types:subopts()) -> ok).
subscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
subscribe(Topic, SubPid, SubId, #{});
subscribe(Topic, SubPid, SubId, #{qos => 0});
subscribe(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) ->
subscribe(Topic, SubPid, undefined, SubOpts);
subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) ->

View File

@ -787,7 +787,7 @@ connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes),
connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes),
State = #state{subscriptions = Subscriptions}) ->
case take_call({unsubscribe, PacketId}, State) of
{value, #call{from = From, req = {_, Topics}}, NewState} ->
{value, #call{from = From, req = {_, _, Topics}}, NewState} ->
Subscriptions1 =
lists:foldl(fun(Topic, Acc) ->
maps:remove(Topic, Acc)

View File

@ -148,7 +148,11 @@ init([Transport, RawSocket, Options]) ->
proto_state = ProtoState,
parser_state = ParserState,
enable_stats = EnableStats,
idle_timeout = IdleTimout}),
idle_timeout = IdleTimout
}),
GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
ok = emqx_gc:init(GcPolicy),
erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)),
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
State, self(), IdleTimout);
{error, Reason} ->
@ -200,16 +204,29 @@ handle_cast(Msg, State) ->
handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
case emqx_protocol:deliver(PubOrAck, ProtoState) of
{ok, ProtoState1} ->
{noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))};
State1 = ensure_stats_timer(State#state{proto_state = ProtoState1}),
ok = maybe_gc(State1, PubOrAck),
{noreply, State1};
{error, Reason} ->
shutdown(Reason, State)
end;
handle_info({timeout, Timer, emit_stats},
State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
State = #state{stats_timer = Timer,
proto_state = ProtoState
}) ->
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
{noreply, State#state{stats_timer = undefined}, hibernate};
NewState = State#state{stats_timer = undefined},
Limits = erlang:get(force_shutdown_policy),
case emqx_misc:conn_proc_mng_policy(Limits) of
continue ->
{noreply, NewState};
hibernate ->
ok = emqx_gc:reset(),
{noreply, NewState, hibernate};
{shutdown, Reason} ->
?LOG(warning, "shutdown due to ~p", [Reason], NewState),
shutdown(Reason, NewState)
end;
handle_info(timeout, State) ->
shutdown(idle_timeout, State);
@ -295,15 +312,16 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
%% Receive and parse data
handle_packet(<<>>, State) ->
{noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))};
handle_packet(<<>>, State0) ->
State = ensure_stats_timer(ensure_rate_limit(State0)),
ok = maybe_gc(State, incoming),
{noreply, State};
handle_packet(Data, State = #state{proto_state = ProtoState,
parser_state = ParserState,
idle_timeout = IdleTimeout}) ->
case catch emqx_frame:parse(Data, ParserState) of
{more, NewParserState} ->
{noreply, State#state{parser_state = NewParserState}, IdleTimeout};
{noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout};
{ok, Packet = ?PACKET(Type), Rest} ->
emqx_metrics:received(Packet),
case emqx_protocol:received(Packet, ProtoState) of
@ -381,7 +399,13 @@ shutdown(Reason, State) ->
stop(Reason, State) ->
{stop, Reason, State}.
maybe_gc(State) ->
%% TODO: gc and shutdown policy
State.
%% For incoming messages, bump gc-stats with packet count and totoal volume
%% For outgoing messages, only 'publish' type is taken into account.
maybe_gc(#state{incoming = #{bytes := Oct, packets := Cnt}}, incoming) ->
ok = emqx_gc:inc(Cnt, Oct);
maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) ->
Oct = iolist_size(Payload),
ok = emqx_gc:inc(1, Oct);
maybe_gc(_, _) ->
ok.

View File

@ -54,14 +54,14 @@ run_command([Cmd | Args]) ->
run_command(list_to_atom(Cmd), Args).
-spec(run_command(cmd(), [string()]) -> ok | {error, term()}).
run_command(set, []) ->
emqx_mgmt_cli_cfg:set_usage(), ok;
% run_command(set, []) ->
% emqx_mgmt_cli_cfg:set_usage(), ok;
run_command(set, Args) ->
emqx_mgmt_cli_cfg:run(["config" | Args]), ok;
% run_command(set, Args) ->
% emqx_mgmt_cli_cfg:run(["config" | Args]), ok;
run_command(show, Args) ->
emqx_mgmt_cli_cfg:run(["config" | Args]), ok;
% run_command(show, Args) ->
% emqx_mgmt_cli_cfg:run(["config" | Args]), ok;
run_command(help, []) ->
usage();

View File

@ -12,38 +12,85 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% GC Utility functions.
%% @doc This module manages an opaque collection of statistics data used to
%% force garbage collection on `self()' process when hitting thresholds.
%% Namely:
%% (1) Total number of messages passed through
%% (2) Total data volume passed through
%% @end
-module(emqx_gc).
%% Memory: (10, 100, 1000)
%%
-export([init/1, inc/2, reset/0]).
-export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2,
maybe_force_gc/3]).
-type st() :: #{ cnt => {integer(), integer()}
, oct => {integer(), integer()}
}.
-spec(conn_max_gc_count() -> integer()).
conn_max_gc_count() ->
case emqx_config:get_env(conn_force_gc_count) of
I when is_integer(I), I > 0 -> I + rand:uniform(I);
I when is_integer(I), I =< 0 -> undefined;
undefined -> undefined
-define(disabled, disabled).
-define(ENABLED(X), (is_integer(X) andalso X > 0)).
%% @doc Initialize force GC parameters.
-spec init(false | map()) -> ok.
init(#{count := Count, bytes := Bytes}) ->
Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)],
Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)],
erlang:put(?MODULE, maps:from_list(Cnt ++ Oct)),
ok;
init(_) -> erlang:put(?MODULE, #{}), ok.
%% @doc Increase count and bytes stats in one call,
%% ensure gc is triggered at most once, even if both thresholds are hit.
-spec inc(pos_integer(), pos_integer()) -> ok.
inc(Cnt, Oct) ->
mutate_pd_with(fun(St) -> inc(St, Cnt, Oct) end).
%% @doc Reset counters to zero.
-spec reset() -> ok.
reset() ->
mutate_pd_with(fun(St) -> reset(St) end).
%% ======== Internals ========
%% mutate gc stats numbers in process dict with the given function
mutate_pd_with(F) ->
St = F(erlang:get(?MODULE)),
erlang:put(?MODULE, St),
ok.
%% Increase count and bytes stats in one call,
%% ensure gc is triggered at most once, even if both thresholds are hit.
-spec inc(st(), pos_integer(), pos_integer()) -> st().
inc(St0, Cnt, Oct) ->
case do_inc(St0, cnt, Cnt) of
{true, St} ->
St;
{false, St1} ->
{_, St} = do_inc(St1, oct, Oct),
St
end.
-spec(reset_conn_gc_count(pos_integer(), tuple()) -> tuple()).
reset_conn_gc_count(Pos, State) ->
case element(Pos, State) of
undefined -> State;
_I -> setelement(Pos, State, conn_max_gc_count())
%% Reset counters to zero.
reset(St) -> reset(cnt, reset(oct, St)).
-spec do_inc(st(), cnt | oct, pos_integer()) -> {boolean(), st()}.
do_inc(St, Key, Num) ->
case maps:get(Key, St, ?disabled) of
?disabled ->
{false, St};
{Init, Remain} when Remain > Num ->
{false, maps:put(Key, {Init, Remain - Num}, St)};
_ ->
{true, do_gc(St)}
end.
maybe_force_gc(Pos, State) ->
maybe_force_gc(Pos, State, fun() -> ok end).
maybe_force_gc(Pos, State, Cb) ->
case element(Pos, State) of
undefined -> State;
I when I =< 0 -> Cb(), garbage_collect(),
reset_conn_gc_count(Pos, State);
I -> setelement(Pos, State, I - 1)
do_gc(St) ->
erlang:garbage_collect(),
reset(St).
reset(Key, St) ->
case maps:get(Key, St, ?disabled) of
?disabled -> St;
{Init, _} -> maps:put(Key, {Init, Init}, St)
end.

View File

@ -51,12 +51,12 @@ start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls ->
%% Start MQTT/WS listener
start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]),
Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]),
start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch);
%% Start MQTT/WSS listener
start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]),
Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]),
start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch).
start_mqtt_listener(Name, ListenOn, Options) ->
@ -67,6 +67,9 @@ start_mqtt_listener(Name, ListenOn, Options) ->
start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) ->
Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}).
mqtt_path(Options) ->
proplists:get_value(mqtt_path, Options, "/mqtt").
ranch_opts(Options) ->
NumAcceptors = proplists:get_value(acceptors, Options, 4),
MaxConnections = proplists:get_value(max_connections, Options, 1024),

View File

@ -22,7 +22,7 @@
-export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]).
-export([set_headers/2]).
-export([get_header/2, get_header/3, set_header/3]).
-export([is_expired/1, check_expiry/1, check_expiry/2, update_expiry/1]).
-export([is_expired/1, update_expiry/1]).
-export([format/1]).
-type(flag() :: atom()).
@ -100,21 +100,6 @@ is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestam
is_expired(_Msg) ->
false.
-spec(check_expiry(emqx_types:message()) -> {ok, pos_integer()} | expired | false).
check_expiry(Msg = #message{timestamp = CreatedAt}) ->
check_expiry(Msg, CreatedAt);
check_expiry(_Msg) ->
false.
-spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false).
check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) ->
case Interval - (elapsed(Since) div 1000) of
Timeout when Timeout > 0 -> {ok, Timeout};
_ -> expired
end;
check_expiry(_Msg, _Since) ->
false.
update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
case elapsed(CreatedAt) of
Elapsed when Elapsed > 0 ->
@ -138,4 +123,3 @@ format(flags, Flags) ->
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
format(headers, Headers) ->
io_lib:format("~p", [Headers]).

View File

@ -15,7 +15,7 @@
-module(emqx_misc).
-export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1,
proc_name/2, proc_stats/0, proc_stats/1]).
proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/1]).
%% @doc Merge options
-spec(merge_opts(list(), list()) -> list()).
@ -36,14 +36,13 @@ start_timer(Interval, Dest, Msg) ->
erlang:start_timer(Interval, Dest, Msg).
-spec(cancel_timer(undefined | reference()) -> ok).
cancel_timer(undefined) ->
ok;
cancel_timer(Timer) ->
case catch erlang:cancel_timer(Timer) of
cancel_timer(Timer) when is_reference(Timer) ->
case erlang:cancel_timer(Timer) of
false ->
receive {timeout, Timer, _} -> ok after 0 -> ok end;
_ -> ok
end.
end;
cancel_timer(_) -> ok.
-spec(proc_name(atom(), pos_integer()) -> atom()).
proc_name(Mod, Id) ->
@ -59,3 +58,50 @@ proc_stats(Pid) ->
{value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats),
[{mailbox_len, V} | Stats1].
-define(DISABLED, 0).
%% @doc Check self() process status against connection/session process management policy,
%% return `continue | hibernate | {shutdown, Reason}' accordingly.
%% `continue': There is nothing out of the ordinary.
%% `hibernate': Nothing to process in my mailbox, and since this check is triggered
%% by a timer, we assume it is a fat chance to continue idel, hence hibernate.
%% `shutdown': Some numbers (message queue length or heap size have hit the limit),
%% hence shutdown for greater good (system stability).
-spec(conn_proc_mng_policy(#{message_queue_len := integer(),
total_heap_size := integer()
} | undefined) -> continue | hibernate | {shutdown, _}).
conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen,
total_heap_size := MaxTotalHeapSize
}) ->
Qlength = proc_info(message_queue_len),
Checks =
[{fun() -> is_message_queue_too_long(Qlength, MaxMsgQueueLen) end,
{shutdown, message_queue_too_long}},
{fun() -> is_heap_size_too_large(MaxTotalHeapSize) end,
{shutdown, total_heap_size_too_large}},
{fun() -> Qlength > 0 end, continue},
{fun() -> true end, hibernate}
],
check(Checks);
conn_proc_mng_policy(_) ->
%% disable by default
conn_proc_mng_policy(#{message_queue_len => 0, total_heap_size => 0}).
check([{Pred, Result} | Rest]) ->
case Pred() of
true -> Result;
false -> check(Rest)
end.
is_message_queue_too_long(Qlength, Max) ->
is_enabled(Max) andalso Qlength > Max.
is_heap_size_too_large(Max) ->
is_enabled(Max) andalso proc_info(total_heap_size) > Max.
is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED.
proc_info(Key) ->
{Key, Value} = erlang:process_info(self(), Key),
Value.

View File

@ -18,7 +18,7 @@
%%
%% This module implements a simple in-memory queue for MQTT persistent session.
%%
%% If the broker restarted or crashed, all the messages queued will be gone.
%% If the broker restarts or crashes, all queued messages will be lost.
%%
%% Concept of Message Queue and Inflight Window:
%%
@ -29,12 +29,15 @@
%% |<--- Win Size --->|
%%
%%
%% 1. Inflight Window to store the messages delivered and awaiting for puback.
%% 1. Inflight Window is to store the messages
%% that are delivered but still awaiting for puback.
%%
%% 2. Enqueue messages when the inflight window is full.
%% 2. Messages are enqueued to tail when the inflight window is full.
%%
%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true,
%% otherwise dropped the oldest one.
%% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
%% in init options
%%
%% 4. If the queue is full drop the oldest one unless `max_len' is set to `0'.
%%
%% @end

View File

@ -17,7 +17,8 @@
-behaviour(gen_server).
-export([start_link/0, start_link/2]).
-export([submit/1, async_submit/1]).
-export([submit/1, submit/2]).
-export([async_submit/1, async_submit/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@ -25,6 +26,8 @@
-define(POOL, ?MODULE).
-type(task() :: fun() | mfa() | {fun(), Args :: list(any())}).
%% @doc Start pooler supervisor.
start_link() ->
emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}).
@ -34,18 +37,35 @@ start_link() ->
start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
%% @doc Submit work to the pool
-spec(submit(fun()) -> any()).
submit(Fun) ->
gen_server:call(worker(), {submit, Fun}, infinity).
%% @doc Submit work to the pool.
-spec(submit(task()) -> any()).
submit(Task) ->
call({submit, Task}).
%% @doc Submit work to the pool asynchronously
-spec(async_submit(fun()) -> ok).
async_submit(Fun) ->
gen_server:cast(worker(), {async_submit, Fun}).
-spec(submit(fun(), list(any())) -> any()).
submit(Fun, Args) ->
call({submit, {Fun, Args}}).
%% @private
call(Req) ->
gen_server:call(worker(), Req, infinity).
%% @doc Submit work to the pool asynchronously.
-spec(async_submit(task()) -> ok).
async_submit(Task) ->
cast({async_submit, Task}).
-spec(async_submit(fun(), list(any())) -> ok).
async_submit(Fun, Args) ->
cast({async_submit, {Fun, Args}}).
%% @private
cast(Msg) ->
gen_server:cast(worker(), Msg).
%% @private
worker() ->
gproc_pool:pick_worker(pool).
gproc_pool:pick_worker(?POOL).
%%------------------------------------------------------------------------------
%% gen_server callbacks
@ -55,15 +75,15 @@ init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #{pool => Pool, id => Id}}.
handle_call({submit, Fun}, _From, State) ->
{reply, catch run(Fun), State};
handle_call({submit, Task}, _From, State) ->
{reply, catch run(Task), State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Pool] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({async_submit, Fun}, State) ->
try run(Fun)
handle_cast({async_submit, Task}, State) ->
try run(Task)
catch _:Error:Stacktrace ->
emqx_logger:error("[Pool] error: ~p, ~p", [Error, Stacktrace])
end,
@ -78,7 +98,7 @@ handle_info(Info, State) ->
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->
true = gproc_pool:disconnect_worker(Pool, {Pool, Id}).
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -89,6 +109,8 @@ code_change(_OldVsn, State, _Extra) ->
run({M, F, A}) ->
erlang:apply(M, F, A);
run({F, A}) when is_function(F), is_list(A) ->
erlang:apply(F, A);
run(Fun) when is_function(Fun) ->
Fun().

View File

@ -410,9 +410,17 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
process_packet(?PACKET(?PINGREQ), PState) ->
send(?PACKET(?PINGRESP), PState);
process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) ->
process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}),
PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) ->
case Interval =/= 0 andalso OldInterval =:= 0 of
true ->
deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState),
{error, protocol_error, PState};
false ->
emqx_session:update_expiry_interval(SPid, Interval),
%% Clean willmsg
{stop, normal, PState#pstate{will_msg = undefined}};
{stop, normal, PState#pstate{will_msg = undefined}}
end;
process_packet(?DISCONNECT_PACKET(_), PState) ->
{stop, normal, PState}.
@ -481,22 +489,30 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
max_topic_alias := MaxAlias,
mqtt_shared_subscription := Shared,
mqtt_wildcard_subscription := Wildcard} = caps(PState),
Props = #{'Maximum-QoS' => MaxQoS,
'Retain-Available' => flag(Retain),
Props = #{'Retain-Available' => flag(Retain),
'Maximum-Packet-Size' => MaxPktSize,
'Topic-Alias-Maximum' => MaxAlias,
'Wildcard-Subscription-Available' => flag(Wildcard),
'Subscription-Identifier-Available' => 1,
'Shared-Subscription-Available' => flag(Shared)},
Props1 = if IsAssigned ->
Props#{'Assigned-Client-Identifier' => ClientId};
true -> Props
Props1 = if
MaxQoS =:= ?QOS_2 ->
Props;
true ->
maps:put('Maximum-QoS', MaxQoS, Props)
end,
Props2 = case emqx_zone:get_env(Zone, server_keepalive) of
undefined -> Props1;
Keepalive -> Props1#{'Server-Keep-Alive' => Keepalive}
Props2 = if IsAssigned ->
Props1#{'Assigned-Client-Identifier' => ClientId};
true -> Props1
end,
send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props2), PState);
Props3 = case emqx_zone:get_env(Zone, server_keepalive) of
undefined -> Props2;
Keepalive -> Props2#{'Server-Keep-Alive' => Keepalive}
end,
send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState);
deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState);
@ -562,17 +578,32 @@ maybe_assign_client_id(PState) ->
PState.
try_open_session(#pstate{zone = Zone,
proto_ver = ProtoVer,
client_id = ClientId,
conn_pid = ConnPid,
conn_props = ConnProps,
username = Username,
clean_start = CleanStart}) ->
case emqx_sm:open_session(#{zone => Zone,
SessAttrs = #{
zone => Zone,
client_id => ClientId,
conn_pid => ConnPid,
username => Username,
clean_start => CleanStart,
conn_props => ConnProps}) of
clean_start => CleanStart
},
case emqx_sm:open_session(maps:put(expiry_interval, if
ProtoVer =:= ?MQTT_PROTO_V5 ->
maps:get('Session-Expiry-Interval', ConnProps, 0);
true ->
case CleanStart of
true ->
0;
false ->
emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
end
end, SessAttrs)) of
{ok, SPid} ->
{ok, SPid, false};
Other -> Other

View File

@ -47,6 +47,7 @@
-export([info/1, attrs/1]).
-export([stats/1]).
-export([resume/2, discard/2]).
-export([update_expiry_interval/2]).
-export([subscribe/2, subscribe/4]).
-export([publish/3]).
-export([puback/2, puback/3]).
@ -313,6 +314,10 @@ resume(SPid, ConnPid) ->
discard(SPid, ByPid) ->
gen_server:call(SPid, {discard, ByPid}, infinity).
-spec(update_expiry_interval(spid(), timeout()) -> ok).
update_expiry_interval(SPid, Interval) ->
gen_server:cast(SPid, {expiry_interval, Interval * 1000}).
-spec(close(spid()) -> ok).
close(SPid) ->
gen_server:call(SPid, close, infinity).
@ -326,7 +331,7 @@ init([Parent, #{zone := Zone,
username := Username,
conn_pid := ConnPid,
clean_start := CleanStart,
conn_props := ConnProps}]) ->
expiry_interval := ExpiryInterval}]) ->
process_flag(trap_exit, true),
true = link(ConnPid),
MaxInflight = get_env(Zone, max_inflight),
@ -346,22 +351,21 @@ init([Parent, #{zone := Zone,
awaiting_rel = #{},
await_rel_timeout = get_env(Zone, await_rel_timeout),
max_awaiting_rel = get_env(Zone, max_awaiting_rel),
expiry_interval = expire_interval(Zone, ConnProps),
expiry_interval = ExpiryInterval,
enable_stats = get_env(Zone, enable_stats, true),
deliver_stats = 0,
enqueue_stats = 0,
created_at = os:timestamp()},
created_at = os:timestamp()
},
emqx_sm:register_session(ClientId, attrs(State)),
emqx_sm:set_session_stats(ClientId, stats(State)),
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
ok = emqx_gc:init(GcPolicy),
erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)),
ok = proc_lib:init_ack(Parent, {ok, self()}),
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) ->
I * 1000;
expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1
get_env(Zone, session_expiry_interval, 0).
init_mqueue(Zone) ->
emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple),
max_len => get_env(Zone, max_mqueue_len, 1000),
@ -536,6 +540,9 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
%% Replay delivery and Dequeue pending messages
noreply(dequeue(retry_delivery(true, State1)));
handle_cast({expiry_interval, Interval}, State) ->
{noreply, State#state{expiry_interval = Interval}};
handle_cast(Msg, State) ->
emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
{noreply, State}.
@ -567,21 +574,30 @@ handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer
handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) ->
noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined}));
handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) ->
handle_info({timeout, Timer, emit_stats},
State = #state{client_id = ClientId,
stats_timer = Timer}) ->
_ = emqx_sm:set_session_stats(ClientId, stats(State)),
{noreply, State#state{stats_timer = undefined}, hibernate};
NewState = State#state{stats_timer = undefined},
Limits = erlang:get(force_shutdown_policy),
case emqx_misc:conn_proc_mng_policy(Limits) of
continue ->
{noreply, NewState};
hibernate ->
ok = emqx_gc:reset(), %% going to hibernate, reset gc stats
{noreply, NewState, hibernate};
{shutdown, Reason} ->
?LOG(warning, "shutdown due to ~p", [Reason], NewState),
shutdown(Reason, NewState)
end;
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
?LOG(info, "expired, shutdown now:(", [], State),
shutdown(expired, State);
handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) ->
{stop, Reason, State#state{conn_pid = undefined}};
handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) ->
{stop, Reason, State#state{conn_pid = undefined}};
handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) ->
handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
{noreply, ensure_expire_timer(State#state{conn_pid = undefined})};
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
@ -744,21 +760,22 @@ dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) ->
end;
%% Deliver qos0 message directly to client
dispatch(Msg = #message{qos = ?QOS0}, State) ->
dispatch(Msg = #message{qos = ?QOS0} = Msg, State) ->
deliver(undefined, Msg, State),
inc_stats(deliver, State);
inc_stats(deliver, Msg, State);
dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight})
dispatch(Msg = #message{qos = QoS} = Msg,
State = #state{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
case emqx_inflight:is_full(Inflight) of
true -> enqueue_msg(Msg, State);
false ->
deliver(PacketId, Msg, State),
await(PacketId, Msg, inc_stats(deliver, next_pkt_id(State)))
await(PacketId, Msg, inc_stats(deliver, Msg, next_pkt_id(State)))
end.
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
inc_stats(enqueue, State#state{mqueue = emqx_mqueue:in(Msg, Q)}).
inc_stats(enqueue, Msg, State#state{mqueue = emqx_mqueue:in(Msg, Q)}).
%%------------------------------------------------------------------------------
%% Deliver
@ -775,7 +792,7 @@ redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) ->
deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = local}) ->
ConnPid ! {deliver, {publish, PacketId, Msg}};
deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = remote}) ->
emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, PacketId, Msg}]).
emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, {publish, PacketId, Msg}}]).
%%------------------------------------------------------------------------------
%% Awaiting ACK for QoS1/QoS2 Messages
@ -859,8 +876,8 @@ ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) ->
ensure_retry_timer(_Timeout, State) ->
State.
ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 ->
State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)};
ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 andalso Interval =/= 16#ffffffff ->
State#state{expiry_timer = emqx_misc:start_timer(Interval * 1000, expired)};
ensure_expire_timer(State) ->
State.
@ -882,11 +899,19 @@ next_pkt_id(State = #state{next_pkt_id = Id}) ->
%%------------------------------------------------------------------------------
%% Inc stats
inc_stats(deliver, State = #state{deliver_stats = I}) ->
inc_stats(deliver, Msg, State = #state{deliver_stats = I}) ->
MsgSize = msg_size(Msg),
ok = emqx_gc:inc(1, MsgSize),
State#state{deliver_stats = I + 1};
inc_stats(enqueue, State = #state{enqueue_stats = I}) ->
inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) ->
State#state{enqueue_stats = I + 1}.
%% Take only the payload size into account, add other fields if necessary
msg_size(#message{payload = Payload}) -> payload_size(Payload).
%% Payload should be binary(), but not 100% sure. Need dialyzer!
payload_size(Payload) -> erlang:iolist_size(Payload).
%%------------------------------------------------------------------------------
%% Helper functions
@ -902,5 +927,3 @@ noreply(State) ->
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.
%% TODO: GC Policy and Shutdown Policy
%% maybe_gc(State) -> State.

View File

@ -26,7 +26,6 @@
-export([start_link/0]).
-export([strategy/0]).
-export([subscribe/3, unsubscribe/3]).
-export([dispatch/3]).
@ -36,6 +35,7 @@
-define(SERVER, ?MODULE).
-define(TAB, emqx_shared_subscription).
-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
-record(state, {pmon}).
-record(emqx_shared_subscription, {group, topic, subpid}).
@ -62,9 +62,9 @@ mnesia(copy) ->
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(strategy() -> round_robin | random | hash).
-spec(strategy() -> random | round_robin | sticky | hash).
strategy() ->
emqx_config:get_env(shared_subscription_strategy, random).
emqx_config:get_env(shared_subscription_strategy, round_robin).
subscribe(undefined, _Topic, _SubPid) ->
ok;
@ -80,23 +80,56 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
%% TODO: dispatch strategy, ensure the delivery...
dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) ->
case pick(subscribers(Group, Topic)) of
#message{from = ClientId} = Msg,
case pick(strategy(), ClientId, Group, Topic) of
false -> Delivery;
SubPid -> SubPid ! {dispatch, Topic, Msg},
Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]}
end.
pick([]) ->
false;
pick([SubPid]) ->
SubPid;
pick(SubPids) ->
lists:nth(rand:uniform(length(SubPids)), SubPids).
pick(sticky, ClientId, Group, Topic) ->
Sub0 = erlang:get(shared_sub_sticky),
case is_sub_alive(Sub0) of
true ->
%% the old subscriber is still alive
%% keep using it for sticky strategy
Sub0;
false ->
%% randomly pick one for the first message
Sub = do_pick(random, ClientId, Group, Topic),
%% stick to whatever pick result
erlang:put(shared_sub_sticky, Sub),
Sub
end;
pick(Strategy, ClientId, Group, Topic) ->
do_pick(Strategy, ClientId, Group, Topic).
do_pick(Strategy, ClientId, Group, Topic) ->
All = subscribers(Group, Topic),
pick_subscriber(Strategy, ClientId, All).
pick_subscriber(_, _ClientId, []) -> false;
pick_subscriber(_, _ClientId, [Sub]) -> Sub;
pick_subscriber(Strategy, ClientId, Subs) ->
Nth = do_pick_subscriber(Strategy, ClientId, length(Subs)),
lists:nth(Nth, Subs).
do_pick_subscriber(random, _ClientId, Count) ->
rand:uniform(Count);
do_pick_subscriber(hash, ClientId, Count) ->
1 + erlang:phash2(ClientId) rem Count;
do_pick_subscriber(round_robin, _ClientId, Count) ->
Rem = case erlang:get(shared_sub_round_robin) of
undefined -> 0;
N -> (N + 1) rem Count
end,
_ = erlang:put(shared_sub_round_robin, Rem),
Rem + 1.
subscribers(Group, Topic) ->
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
%%-----------------------------------------------------------------------------
%% gen_server callbacks
%%-----------------------------------------------------------------------------
@ -104,6 +137,7 @@ subscribers(Group, Topic) ->
init([]) ->
{atomic, PMon} = mnesia:transaction(fun init_monitors/0),
mnesia:subscribe({table, ?TAB, simple}),
ets:new(?ALIVE_SUBS, [named_table, {read_concurrency, true}, protected]),
{ok, update_stats(#state{pmon = PMon})}.
init_monitors() ->
@ -117,8 +151,9 @@ handle_call(Req, _From, State) ->
{reply, ignored, State}.
handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) ->
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
NewPmon = emqx_pmon:monitor(SubPid, PMon),
ets:insert(?ALIVE_SUBS, {SubPid}),
{noreply, update_stats(State#state{pmon = NewPmon})};
handle_cast(Msg, State) ->
emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]),
{noreply, State}.
@ -154,6 +189,7 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
cleanup_down(SubPid) ->
ets:delete(?ALIVE_SUBS, SubPid),
lists:foreach(
fun(Record) ->
mnesia:dirty_delete_object(?TAB, Record)
@ -162,3 +198,7 @@ cleanup_down(SubPid) ->
update_stats(State) ->
emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State.
%% erlang:is_process_alive/1 is expensive
%% and does not work with remote pids
is_sub_alive(Sub) -> [] =/= ets:lookup(?ALIVE_SUBS, Sub).

View File

@ -49,8 +49,18 @@
-define(PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, <<"publish">>)).
-define(PAYLOAD, [{type,"dsmSimulationData"},
{id, 9999},
{status, "running"},
{soc, 1536702170},
{fracsec, 451000},
{data, lists:seq(1, 20480)}]).
-define(BIG_PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, emqx_json:encode(?PAYLOAD))).
all() ->
[{group, connect}].
[{group, connect},
{group, publish}].
groups() ->
[{connect, [non_parallel_tests],
@ -60,6 +70,10 @@ groups() ->
mqtt_connect_with_ssl_oneway,
mqtt_connect_with_ssl_twoway,
mqtt_connect_with_ws
]},
{publish, [non_parallel_tests],
[
packet_size
]}].
init_per_suite(Config) ->
@ -157,6 +171,21 @@ mqtt_connect_with_ws(_Config) ->
{close, _} = rfc6455_client:close(WS),
ok.
%%issue 1811
packet_size(_Config) ->
{ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000),
Packet = raw_send_serialise(?CLIENT),
emqx_client_sock:send(Sock, Packet),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data),
%% Pub Packet QoS 1
PubPacket = raw_send_serialise(?BIG_PUBPACKET),
emqx_client_sock:send(Sock, PubPacket),
{ok, Data1} = gen_tcp:recv(Sock, 0),
{ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(Data1),
emqx_client_sock:close(Sock).
raw_send_serialise(Packet) ->
emqx_frame:serialize(Packet).

53
test/emqx_gc_tests.erl Normal file
View File

@ -0,0 +1,53 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_gc_tests).
-include_lib("eunit/include/eunit.hrl").
trigger_by_cnt_test() ->
Args = #{count => 2, bytes => 0},
ok = emqx_gc:init(Args),
ok = emqx_gc:inc(1, 1000),
St1 = inspect(),
?assertMatch({_, Remain} when Remain > 0, maps:get(cnt, St1)),
ok = emqx_gc:inc(2, 2),
St2 = inspect(),
ok = emqx_gc:inc(0, 2000),
St3 = inspect(),
?assertEqual(St2, St3),
?assertMatch({N, N}, maps:get(cnt, St2)),
?assertNot(maps:is_key(oct, St2)),
ok.
trigger_by_oct_test() ->
Args = #{count => 2, bytes => 2},
ok = emqx_gc:init(Args),
ok = emqx_gc:inc(1, 1),
St1 = inspect(),
?assertMatch({_, Remain} when Remain > 0, maps:get(oct, St1)),
ok = emqx_gc:inc(2, 2),
St2 = inspect(),
?assertMatch({N, N}, maps:get(oct, St2)),
?assertMatch({M, M}, maps:get(cnt, St2)),
ok.
disabled_test() ->
Args = #{count => -1, bytes => false},
ok = emqx_gc:init(Args),
ok = emqx_gc:inc(1, 1),
?assertEqual(#{}, inspect()),
ok.
inspect() -> erlang:get(emqx_gc).

View File

@ -68,11 +68,8 @@ message_expired(_) ->
Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg),
timer:sleep(500),
?assertNot(emqx_message:is_expired(Msg1)),
{ok, 1} = emqx_message:check_expiry(Msg1),
timer:sleep(600),
?assert(emqx_message:is_expired(Msg1)),
expired = emqx_message:check_expiry(Msg1),
timer:sleep(1000),
Msg2 = emqx_message:update_expiry(Msg1),
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).

46
test/emqx_misc_tests.erl Normal file
View File

@ -0,0 +1,46 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_misc_tests).
-include_lib("eunit/include/eunit.hrl").
timer_cancel_flush_test() ->
Timer = emqx_misc:start_timer(0, foo),
ok = emqx_misc:cancel_timer(Timer),
receive {timeout, Timer, foo} -> error(unexpected)
after 0 -> ok
end.
shutdown_disabled_test() ->
self() ! foo,
?assertEqual(continue, conn_proc_mng_policy(0, 0)),
receive foo -> ok end,
?assertEqual(hibernate, conn_proc_mng_policy(0, 0)).
message_queue_too_long_test() ->
self() ! foo,
self() ! bar,
?assertEqual({shutdown, message_queue_too_long},
conn_proc_mng_policy(1, 0)),
receive foo -> ok end,
?assertEqual(continue, conn_proc_mng_policy(1, 0)),
receive bar -> ok end.
total_heap_size_too_large_test() ->
?assertEqual({shutdown, total_heap_size_too_large},
conn_proc_mng_policy(0, 1)).
conn_proc_mng_policy(L, S) ->
emqx_misc:conn_proc_mng_policy(#{message_queue_len => L,
total_heap_size => S}).

View File

@ -16,14 +16,12 @@
-behaviour(gen_server).
-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0]).
-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {clean_start, client_id, client_pid}).
-define(TAB, messages).
-record(state, {clean_start, client_id, client_pid, last_msg}).
start_link(ClientId) ->
gen_server:start_link(?MODULE, [ClientId], []).
@ -37,19 +35,14 @@ close_session(ClientPid, SessPid) ->
stop(CPid) ->
gen_server:call(CPid, stop).
get_last_message() ->
[{last_message, Msg}] = ets:lookup(?TAB, last_message),
Msg.
get_last_message(Pid) ->
gen_server:call(Pid, get_last_message).
init([ClientId]) ->
Result = lists:member(?TAB, ets:all()),
if Result == false ->
ets:new(?TAB, [set, named_table, public]);
true -> ok
end,
{ok,
#state{clean_start = true,
client_id = ClientId}
{ok, #state{clean_start = true,
client_id = ClientId,
last_msg = undefined
}
}.
handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
@ -58,31 +51,29 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
conn_pid => ClientPid,
clean_start => true,
username => undefined,
conn_props => undefined
expiry_interval => 0
},
{ok, SessPid} = emqx_sm:open_session(Attrs),
{reply, {ok, SessPid}, State#state{
clean_start = true,
{reply, {ok, SessPid},
State#state{clean_start = true,
client_id = ClientId,
client_pid = ClientPid
}};
handle_call({stop_session, SessPid}, _From, State) ->
emqx_sm:close_session(SessPid),
{stop, normal, ok, State};
handle_call(get_last_message, _From, #state{last_msg = Msg} = State) ->
{reply, Msg, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({_, Msg}, State) ->
ets:insert(?TAB, {last_message, Msg}),
{noreply, State};
handle_info({deliver, Msg}, State) ->
{noreply, State#state{last_msg = Msg}};
handle_info(_Info, State) ->
{noreply, State}.

65
test/emqx_pool_SUITE.erl Normal file
View File

@ -0,0 +1,65 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_pool_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> [
{group, submit_case},
{group, async_submit_case}
].
groups() ->
[
{submit_case, [sequence], [submit_mfa, submit_fa]},
{async_submit_case, [sequence], [async_submit_mfa]}
].
init_per_suite(Config) ->
application:ensure_all_started(gproc),
Config.
end_per_suite(_Config) ->
ok.
submit_mfa(_Config) ->
erlang:process_flag(trap_exit, true),
{ok, Pid} = emqx_pool:start_link(),
Result = emqx_pool:submit({?MODULE, test_mfa, []}),
?assertEqual(15, Result),
gen_server:stop(Pid, normal, 3000),
ok.
submit_fa(_Config) ->
{ok, Pid} = emqx_pool:start_link(),
Fun = fun(X) -> case X rem 2 of 0 -> {true, X div 2}; _ -> false end end,
Result = emqx_pool:submit(Fun, [2]),
?assertEqual({true, 1}, Result),
exit(Pid, normal).
test_mfa() ->
lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]).
async_submit_mfa(_Config) ->
{ok, Pid} = emqx_pool:start_link(),
emqx_pool:async_submit({?MODULE, test_mfa, []}),
exit(Pid, normal).

View File

@ -40,7 +40,7 @@ t_session_all(_) ->
[{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}),
emqx_session:publish(SPid, 1, Message1),
timer:sleep(200),
{publish, 1, _} = emqx_mock_client:get_last_message(),
{publish, 1, _} = emqx_mock_client:get_last_message(ConnPid),
emqx_session:puback(SPid, 2),
emqx_session:puback(SPid, 3, reasoncode),
emqx_session:pubrec(SPid, 4),

View File

@ -0,0 +1,198 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_shared_sub_SUITE).
-export([all/0, init_per_suite/1, end_per_suite/1]).
-export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]).
-include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
all() -> [t_random_basic, t_random, t_round_robin, t_sticky, t_hash, t_not_so_sticky].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_random_basic(_) ->
application:set_env(?APPLICATION, shared_subscription_strategy, random),
ClientId = <<"ClientId">>,
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
{ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
Message1 = emqx_message:make(<<"ClientId">>, 2, <<"foo">>, <<"hello">>),
emqx_session:subscribe(SPid, [{<<"foo">>, #{qos => 2, share => <<"group1">>}}]),
%% wait for the subscription to show up
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000),
emqx_session:publish(SPid, 1, Message1),
?wait(case emqx_mock_client:get_last_message(ConnPid) of
{publish, 1, _} -> true;
Other -> Other
end, 1000),
emqx_session:puback(SPid, 2),
emqx_session:puback(SPid, 3, reasoncode),
emqx_session:pubrec(SPid, 4),
emqx_session:pubrec(SPid, 5, reasoncode),
emqx_session:pubrel(SPid, 6, reasoncode),
emqx_session:pubcomp(SPid, 7, reasoncode),
emqx_mock_client:close_session(ConnPid, SPid),
ok.
t_random(_) ->
test_two_messages(random).
t_round_robin(_) ->
test_two_messages(round_robin).
t_sticky(_) ->
test_two_messages(sticky).
t_hash(_) ->
test_two_messages(hash).
%% if the original subscriber dies, change to another one alive
t_not_so_sticky(_) ->
application:set_env(?APPLICATION, shared_subscription_strategy, sticky),
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqx_mock_client:start_link(ClientId1),
{ok, ConnPid2} = emqx_mock_client:start_link(ClientId2),
{ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal),
{ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal),
Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>),
emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
%% wait for the subscription to show up
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}], 1000),
emqx_session:publish(SPid1, 1, Message1),
?wait(case emqx_mock_client:get_last_message(ConnPid1) of
{publish, _, #message{payload = <<"hello1">>}} -> true;
Other -> Other
end, 1000),
emqx_mock_client:close_session(ConnPid1, SPid1),
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [], 1000),
emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]),
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000),
emqx_session:publish(SPid2, 2, Message2),
?wait(case emqx_mock_client:get_last_message(ConnPid2) of
{publish, _, #message{payload = <<"hello2">>}} -> true;
Other -> Other
end, 1000),
emqx_mock_client:close_session(ConnPid2, SPid2),
?wait(ets:tab2list(emqx_alive_shared_subscribers) =:= [], 1000),
ok.
test_two_messages(Strategy) ->
application:set_env(?APPLICATION, shared_subscription_strategy, Strategy),
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqx_mock_client:start_link(ClientId1),
{ok, ConnPid2} = emqx_mock_client:start_link(ClientId2),
{ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal),
{ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal),
Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>),
emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
emqx_session:subscribe(SPid2, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
%% wait for the subscription to show up
?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso
ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000),
emqx_session:publish(SPid1, 1, Message1),
Me = self(),
WaitF = fun(ExpectedPayload) ->
case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
{true, Pid} ->
Me ! {subscriber, Pid},
true;
Other ->
Other
end
end,
?wait(WaitF(<<"hello1">>), 2000),
UsedSubPid1 = receive {subscriber, P1} -> P1 end,
%% publish both messages with SPid1
emqx_session:publish(SPid1, 2, Message2),
?wait(WaitF(<<"hello2">>), 2000),
UsedSubPid2 = receive {subscriber, P2} -> P2 end,
case Strategy of
sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2);
round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2);
hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
_ -> ok
end,
emqx_mock_client:close_session(ConnPid1, SPid1),
emqx_mock_client:close_session(ConnPid2, SPid2),
ok.
last_message(_ExpectedPayload, []) -> <<"not yet?">>;
last_message(ExpectedPayload, [Pid | Pids]) ->
case emqx_mock_client:get_last_message(Pid) of
{publish, _, #message{payload = ExpectedPayload}} -> {true, Pid};
_Other -> last_message(ExpectedPayload, Pids)
end.
%%------------------------------------------------------------------------------
%% help functions
%%------------------------------------------------------------------------------
wait_for(Fn, Ln, F, Timeout) ->
{Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) ->
receive
{'DOWN', Mref, process, Pid, normal} ->
ok;
{'DOWN', Mref, process, Pid, {C, E, S}} ->
erlang:raise(C, {Fn, Ln, E}, S)
after
Timeout ->
case Kill of
true ->
erlang:demonitor(Mref, [flush]),
erlang:exit(Pid, kill),
erlang:error({Fn, Ln, timeout});
false ->
Pid ! stop,
wait_for_down(Fn, Ln, Timeout, Pid, Mref, true)
end
end.
wait_loop(_F, true) -> exit(normal);
wait_loop(F, LastRes) ->
Res = catch_call(F),
receive
stop -> erlang:exit(LastRes)
after
100 -> wait_loop(F, Res)
end.
catch_call(F) ->
try
case F() of
true -> true;
Other -> erlang:error({unexpected, Other})
end
catch
C : E : S ->
{C, E, S}
end.

View File

@ -25,7 +25,7 @@ t_open_close_session(_) ->
emqx_ct_broker_helpers:run_setup_steps(),
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid,
zone => internal, username => <<"zhou">>, conn_props => #{}},
zone => internal, username => <<"zhou">>, expiry_interval => 0},
{ok, SPid} = emqx_sm:open_session(Attrs),
[{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
SPid = emqx_sm:lookup_session_pid(<<"client">>),