diff --git a/.gitignore b/.gitignore index 0927f0bbc..80e3bf42d 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,5 @@ bbmustache/ etc/gen.emqx.conf compile_commands.json cuttlefish +rebar.lock +xrefr diff --git a/.travis.yml b/.travis.yml index e4088022d..adef0f3cd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,10 @@ before_install: script: - make dep-vsn-check + - make rebar-compile - make rebar-eunit - make rebar-ct + - make rebar-cover - make coveralls sudo: false diff --git a/Makefile b/Makefile index 07c097610..09f6a7ad8 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0 dep_gproc = git https://github.com/uwiger/gproc 0.8.0 dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0 dep_lager = git https://github.com/erlang-lager/lager 3.6.5 -dep_esockd = git https://github.com/emqx/esockd v5.4 +dep_esockd = git https://github.com/emqx/esockd v5.4.1 dep_ekka = git https://github.com/emqx/ekka v0.4.1 dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0 dep_clique = git https://github.com/emqx/clique develop @@ -18,7 +18,7 @@ dep_emqx_passwd = git https://github.com/emqx/emqx-passwd win30 NO_AUTOPATCH = cuttlefish -ERLC_OPTS += +debug_info +ERLC_OPTS += +debug_info -DAPPLICATION=emqx ERLC_OPTS += +'{parse_transform, lager_transform}' BUILD_DEPS = cuttlefish @@ -27,7 +27,7 @@ dep_cuttlefish = git https://github.com/emqx/cuttlefish emqx30 #TEST_DEPS = emqx_ct_helplers #dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers -TEST_ERLC_OPTS += +debug_info +TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' EUNIT_OPTS = verbose @@ -39,7 +39,7 @@ CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ - emqx_mountpoint emqx_listeners emqx_protocol + emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) @@ -60,7 +60,7 @@ gen-clean: @rm -f etc/gen.emqx.conf bbmustache: - $(verbose) git clone https://github.com/soranoba/bbmustache.git && pushd bbmustache && ./rebar3 compile && popd + $(verbose) git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd .. # This hack is to generate a conf file for testing # relx overlay is used for release @@ -78,26 +78,29 @@ app.config: etc/gen.emqx.conf ct: cuttlefish app.config +rebar-cover: + @rebar3 cover + coveralls: @rebar3 coveralls send -cuttlefish: deps - @mv ./deps/cuttlefish/cuttlefish ./cuttlefish -rebar-cuttlefish: rebar-deps - @make -C _build/default/lib/cuttlefish - @mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish +cuttlefish: rebar-deps + @if [ ! -f cuttlefish ]; then \ + make -C _build/default/lib/cuttlefish; \ + mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish; \ + fi rebar-deps: @rebar3 get-deps -rebar-eunit: +rebar-eunit: cuttlefish @rebar3 eunit rebar-compile: @rebar3 compile -rebar-ct: rebar-cuttlefish app.config +rebar-ct: cuttlefish app.config @rebar3 as test compile @ln -s -f '../../../../etc' _build/test/lib/emqx/ @ln -s -f '../../../../data' _build/test/lib/emqx/ diff --git a/etc/emqx.conf b/etc/emqx.conf index 87665c194..dc91db930 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -515,25 +515,31 @@ zone.external.idle_timeout = 15s ## Publish limit for the external MQTT connections. ## -## Value: rate,burst -## Default: 10 messages per second, and 100 messages burst. -## zone.external.publish_limit = 10,100 - -## Enable ban check. -## -## Value: Flag -zone.external.enable_ban = on +## Value: Number,Duration +## Example: 10 messages per minute. +## zone.external.publish_limit = 10,1m ## Enable ACL check. ## ## Value: Flag zone.external.enable_acl = on +## Enable ban check. +## +## Value: Flag +zone.external.enable_ban = on + ## Enable per connection statistics. ## ## Value: on | off zone.external.enable_stats = on +## Force MQTT connection/session process GC after this number of +## messages | bytes passed through. +## +## Numbers delimited by `|'. Zero or negative is to disable. +zone.external.force_gc_policy = 1000|1MB + ## Maximum MQTT packet size allowed. ## ## Value: Bytes @@ -750,7 +756,7 @@ listener.tcp.external.zone = external ## ## Value: rate,burst ## Unit: Bps -## listener.tcp.external.rate_limit = 1024,4096 +listener.tcp.external.rate_limit = 1024,4096 ## The access control rules for the MQTT/TCP listener. ## @@ -1197,6 +1203,11 @@ listener.ssl.external.reuseaddr = true ## Examples: 8083, 127.0.0.1:8083, ::1:8083 listener.ws.external = 8083 +## The path of WebSocket MQTT endpoint +## +## Value: URL Path +listener.ws.external.mqtt_path = /mqtt + ## The acceptor pool for external MQTT/WebSocket listener. ## ## Value: Number @@ -1336,6 +1347,11 @@ listener.ws.external.nodelay = true ## Examples: 8084, 127.0.0.1:8084, ::1:8084 listener.wss.external = 8084 +## The path of WebSocket MQTT endpoint +## +## Value: URL Path +listener.wss.external.mqtt_path = /mqtt + ## The acceptor pool for external MQTT/WebSocket/SSL listener. ## ## Value: Number @@ -1531,241 +1547,257 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ##-------------------------------------------------------------------- ##-------------------------------------------------------------------- -## Bridges to edge +## Bridges to aws ##-------------------------------------------------------------------- -## Bridge type. -## -## Value: Enum -## Example: out | in -bridge.edge.type = in - -## Bridge address: node name for local bridge, host:port for remote. -## -## Value: String -## Example: emqx@127.0.0.1, 127.0.0.1:1883 -bridge.edge.address = 127.0.0.1:1883 - -## Protocol version of the bridge. -## -## Value: Enum -## - mqtt5 -## - mqtt4 -## - mqtt3 -bridge.edge.proto_ver = mqtt4 - -## The ClientId of a remote bridge. -## -## Value: String -bridge.edge.client_id = bridge_edge - -## The Clean start flag of a remote bridge. -## -## Value: boolean -bridge.edge.clean_start = false - -## The username for a remote bridge. -## -## Value: String -bridge.edge.username = user - -## The password for a remote bridge. -## -## Value: String -bridge.edge.password = passwd - -## Mountpoint of the bridge. -## -## Value: String -## bridge.edge.mountpoint = bridge/edge/ - -## Ping interval of a down bridge. -## -## Value: Duration -## Default: 10 seconds -bridge.edge.keepalive = 10s - -## Subscriptions of the bridge topic. -## -## Value: String -bridge.edge.subscription.1.topic = # - -## Subscriptions of the bridge qos. -## -## Value: Number -bridge.edge.subscription.1.qos = 1 - -## The pending message queue of a bridge. -## -## Value: Number -bridge.edge.max_pending_messages = 10000 - ## Start type of the bridge. ## ## Value: enum ## manual ## auto -bridge.edge.start_type = manual - -## Bridge reconnect count. -## -## Value: Number -bridge.edge.reconnect_count = 10 +bridge.aws.start_type = manual ## Bridge reconnect time. ## ## Value: Duration ## Default: 30 seconds -bridge.edge.reconnect_time = 30s - -## PEM-encoded CA certificates of the bridge. -## -## Value: File -## bridge.edge.cacertfile = cacert.pem - -## SSL Certfile of the bridge. -## -## Value: File -## bridge.edge.certfile = cert.pem - -## SSL Keyfile of the bridge. -## -## Value: File -## bridge.edge.keyfile = key.pem - -## SSL Ciphers used by the bridge. -## -## Value: String -## bridge.edge.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 - -## TLS versions used by the bridge. -## -## Value: String -## bridge.edge.tls_versions = tlsv1.2,tlsv1.1,tlsv1 - - -##-------------------------------------------------------------------- -## Bridges to cloud -##-------------------------------------------------------------------- -## Bridge type. -## -## Value: Enum -## Example: out | in -bridge.cloud.type = out +bridge.aws.reconnect_interval = 30s ## Bridge address: node name for local bridge, host:port for remote. ## ## Value: String ## Example: emqx@127.0.0.1, 127.0.0.1:1883 -bridge.cloud.address = 127.0.0.1:1883 +bridge.aws.address = 127.0.0.1:1883 ## Protocol version of the bridge. ## ## Value: Enum -## - mqtt5 -## - mqtt4 -## - mqtt3 -bridge.cloud.proto_ver = mqtt4 +## - mqttv5 +## - mqttv4 +## - mqttv3 +bridge.aws.proto_ver = mqttv4 ## The ClientId of a remote bridge. ## ## Value: String -bridge.cloud.client_id = bridge_cloud +bridge.aws.client_id = bridge_aws ## The Clean start flag of a remote bridge. ## ## Value: boolean -bridge.cloud.clean_start = false +bridge.aws.clean_start = false ## The username for a remote bridge. ## ## Value: String -bridge.cloud.username = user +bridge.aws.username = user ## The password for a remote bridge. ## ## Value: String -bridge.cloud.password = passwd +bridge.aws.password = passwd ## Mountpoint of the bridge. ## ## Value: String -bridge.cloud.mountpoint = bridge/edge/${node}/ +bridge.aws.mountpoint = bridge/aws/${node}/ ## Ping interval of a down bridge. ## ## Value: Duration ## Default: 10 seconds -bridge.cloud.keepalive = 10s +bridge.aws.keepalive = 60s ## Forward message topics ## ## Value: String ## Example: topic1/#,topic2/# -bridge.cloud.forward_rule = # +bridge.aws.forwards = topic1/#,topic2/# ## Subscriptions of the bridge topic. ## ## Value: String -bridge.cloud.subscription.1.topic = $share/cmd/topic1 +bridge.aws.subscription.1.topic = cmd/topic1 ## Subscriptions of the bridge qos. ## ## Value: Number -bridge.cloud.subscription.1.qos = 1 +bridge.aws.subscription.1.qos = 1 -## Bridge store message type. +## Subscriptions of the bridge topic. +## +## Value: String +bridge.aws.subscription.2.topic = cmd/topic2 + +## Subscriptions of the bridge qos. +## +## Value: Number +bridge.aws.subscription.2.qos = 1 + +## Bridge message queue message type. ## ## Value: Enum ## Example: memory | disk -bridge.cloud.store_type = memory +bridge.aws.mqueue_type = memory ## The pending message queue of a bridge. ## ## Value: Number -bridge.cloud.max_pending_messages = 10000 +bridge.aws.max_pending_messages = 10000 + +## PEM-encoded CA certificates of the bridge. +## +## Value: File +## bridge.aws.cacertfile = cacert.pem + +## SSL Certfile of the bridge. +## +## Value: File +## bridge.aws.certfile = cert.pem + +## SSL Keyfile of the bridge. +## +## Value: File +## bridge.aws.keyfile = key.pem + +## SSL Ciphers used by the bridge. +## +## Value: String +## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 + +## TLS versions used by the bridge. +## +## Value: String +## bridge.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1 + +##-------------------------------------------------------------------- +## Bridges to azure +##-------------------------------------------------------------------- ## Start type of the bridge. ## ## Value: enum ## manual ## auto -bridge.cloud.start_type = manual +## bridge.azure.start_type = manual ## Bridge reconnect count. ## ## Value: Number -bridge.cloud.reconnect_count = 10 +## bridge.azure.reconnect_count = 10 ## Bridge reconnect time. ## ## Value: Duration ## Default: 30 seconds -bridge.cloud.reconnect_time = 30s +## bridge.azure.reconnect_time = 30s + +## Bridge address: node name for local bridge, host:port for remote. +## +## Value: String +## Example: emqx@127.0.0.1, 127.0.0.1:1883 +## bridge.azure.address = 127.0.0.1:1883 + +## Protocol version of the bridge. +## +## Value: Enum +## - mqttv5 +## - mqttv4 +## - mqttv3 +## bridge.azure.proto_ver = mqttv4 + +## The ClientId of a remote bridge. +## +## Value: String +## bridge.azure.client_id = bridge_azure + +## The Clean start flag of a remote bridge. +## +## Value: boolean +## bridge.azure.clean_start = false + +## The username for a remote bridge. +## +## Value: String +## bridge.azure.username = user + +## The password for a remote bridge. +## +## Value: String +## bridge.azure.password = passwd + +## Mountpoint of the bridge. +## +## Value: String +## bridge.azure.mountpoint = bridge/azure/${node}/ + +## Ping interval of a down bridge. +## +## Value: Duration +## Default: 10 seconds +## bridge.azure.keepalive = 10s + +## Forward message topics +## +## Value: String +## Example: topic1/#,topic2/# +## bridge.azure.forwards = topic1/#,topic2/# + +## Subscriptions of the bridge topic. +## +## Value: String +## bridge.azure.subscription.1.topic = $share/cmd/topic1 + +## Subscriptions of the bridge qos. +## +## Value: Number +## bridge.azure.subscription.1.qos = 1 + +## Subscriptions of the bridge topic. +## +## Value: String +## bridge.azure.subscription.2.topic = $share/cmd/topic2 + +## Subscriptions of the bridge qos. +## +## Value: Number +## bridge.azure.subscription.2.qos = 1 + +## Bridge store message type. +## +## Value: Enum +## Example: memory | disk +## bridge.azure.store_type = memory + +## The pending message queue of a bridge. +## +## Value: Number +## bridge.azure.max_pending_messages = 10000 + ## PEM-encoded CA certificates of the bridge. ## ## Value: File -## bridge.cloud.cacertfile = cacert.pem +## bridge.azure.cacertfile = cacert.pem ## SSL Certfile of the bridge. ## ## Value: File -## bridge.cloud.certfile = cert.pem +## bridge.azure.certfile = cert.pem ## SSL Keyfile of the bridge. ## ## Value: File -## bridge.cloud.keyfile = key.pem +## bridge.azure.keyfile = key.pem ## SSL Ciphers used by the bridge. ## ## Value: String -## bridge.cloud.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 +## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 ## TLS versions used by the bridge. ## ## Value: String -## bridge.cloud.tls_versions = tlsv1.2,tlsv1.1,tlsv1 +## bridge.azure.tls_versions = tlsv1.2,tlsv1.1,tlsv1 ##-------------------------------------------------------------------- ## Modules @@ -1819,7 +1851,7 @@ module.rewrite = off ## The etc dir for plugins' config. ## ## Value: Folder -plugins.etc_dir ={{ platform_etc_dir }}/plugins/ +plugins.etc_dir = {{ platform_etc_dir }}/plugins/ ## The file to store loaded plugin names. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 4d2c4c3bd..9172e401e 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -649,14 +649,14 @@ end}. {datatype, {enum, [allow, deny]}} ]}. -%% @doc Enable Ban. -{mapping, "zone.$name.enable_ban", "emqx.zones", [ +%% @doc Enable ACL check. +{mapping, "zone.$name.enable_acl", "emqx.zones", [ {default, off}, {datatype, flag} ]}. -%% @doc Enable ACL check. -{mapping, "zone.$name.enable_acl", "emqx.zones", [ +%% @doc Enable Ban. +{mapping, "zone.$name.enable_ban", "emqx.zones", [ {default, off}, {datatype, flag} ]}. @@ -669,7 +669,6 @@ end}. %% @doc Publish limit of the MQTT connections. {mapping, "zone.$name.publish_limit", "emqx.zones", [ - {default, undefined}, {datatype, string} ]}. @@ -770,7 +769,7 @@ end}. %% @doc Session Expiry Interval {mapping, "zone.$name.session_expiry_interval", "emqx.zones", [ {default, "2h"}, - {datatype, {duration, ms}} + {datatype, {duration, s}} ]}. %% @doc Type: simple | priority @@ -797,6 +796,25 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Force connection/session process GC after this number of +%% messages | bytes passed through. +%% Numbers delimited by `|'. Zero or negative is to disable. +{mapping, "zone.$name.force_gc_policy", "emqx.zones", [ + {default, "0 | 0MB"}, + {datatype, string} + ]}. + +%% @doc Max message queue length and total heap size to force shutdown +%% connection/session process. +%% Message queue here is the Erlang process mailbox, but not the number +%% of queued MQTT messages of QoS 1 and 2. +%% Total heap size is the in Erlang 'words' not in 'bytes'. +%% Zero or negative is to disable. +{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ + {default, "0 | 0MB"}, + {datatype, string} +]}. + {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; @@ -804,6 +822,34 @@ end}. {mqtt_wildcard_subscription, Val}; ("shared_subscription", Val) -> {mqtt_shared_subscription, Val}; + ("publish_limit", Val) -> + [Limit, Duration] = string:tokens(Val, ", "), + PubLimit = case cuttlefish_duration:parse(Duration, s) of + Secs when is_integer(Secs) -> + {list_to_integer(Limit) / Secs, list_to_integer(Limit)}; + {error, Reason} -> + error(Reason) + end, + {publish_limit, PubLimit}; + ("force_gc_policy", Val) -> + [Count, Bytes] = string:tokens(Val, "| "), + GcPolicy = case cuttlefish_bytesize:parse(Bytes) of + {error, Reason} -> + error(Reason); + Bytes1 -> + #{bytes => Bytes1, count => list_to_integer(Count)} + end, + {force_gc_policy, GcPolicy}; + ("force_shutdown_policy", Val) -> + [Len, Siz] = string:tokens(Val, "| "), + ShutdownPolicy = case cuttlefish_bytesize:parse(Siz) of + {error, Reason} -> + error(Reason); + Siz1 -> + #{message_queue_len => list_to_integer(Len), + total_heap_size => Siz1} + end, + {force_shutdown_policy, ShutdownPolicy}; (Opt, Val) -> {list_to_atom(Opt), Val} end, @@ -1067,6 +1113,11 @@ end}. {datatype, [integer, ip]} ]}. +{mapping, "listener.ws.$name.mqtt_path", "emqx.listeners", [ + {default, "/mqtt"}, + {datatype, string} +]}. + {mapping, "listener.ws.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} @@ -1168,6 +1219,11 @@ end}. {datatype, [integer, ip]} ]}. +{mapping, "listener.wss.$name.mqtt_path", "emqx.listeners", [ + {default, "/mqtt"}, + {datatype, string} +]}. + {mapping, "listener.wss.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} @@ -1338,7 +1394,8 @@ end}. end, LisOpts = fun(Prefix) -> - Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + {mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, @@ -1416,12 +1473,7 @@ end}. %%-------------------------------------------------------------------- %% Bridges %%-------------------------------------------------------------------- - -{mapping, "bridge.$name.type", "emqx.bridges", [ - {datatype, {enum, [in, out]}} -]}. - -{mapping, "bridge.$name.store_type", "emqx.bridges", [ +{mapping, "bridge.$name.mqueue_type", "emqx.bridges", [ {datatype, {enum, [memory, disk]}} ]}. @@ -1430,7 +1482,7 @@ end}. ]}. {mapping, "bridge.$name.proto_ver", "emqx.bridges", [ - {datatype, {enum, [mqtt3, mqtt4, mqtt5]}} + {datatype, {enum, [mqttv3, mqttv4, mqttv5]}} ]}. {mapping, "bridge.$name.client_id", "emqx.bridges", [ @@ -1454,7 +1506,7 @@ end}. {datatype, string} ]}. -{mapping, "bridge.$name.forward_rule", "emqx.bridges", [ +{mapping, "bridge.$name.forwards", "emqx.bridges", [ {datatype, string} ]}. @@ -1501,12 +1553,7 @@ end}. {default, auto} ]}. -{mapping, "bridge.$name.reconnect_count", "emqx.bridges", [ - {default, 10}, - {datatype, integer} -]}. - -{mapping, "bridge.$name.reconnect_time", "emqx.bridges", [ +{mapping, "bridge.$name.reconnect_interval", "emqx.bridges", [ {default, "30s"}, {datatype, {duration, s}} ]}. @@ -1660,9 +1707,16 @@ end}. {datatype, {enum, [local,one,quorum,all]}} ]}. +%% @doc Shared Subscription Dispatch Strategy. {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ - {default, random}, - {datatype, {enum, [random, round_robbin, hash]}} + {default, round_robbin}, + {datatype, + {enum, + [random, %% randomly pick a subscriber + round_robbin, %% round robin alive subscribers one message after another + sticky, %% pick a random subscriber and stick to it + hash %% hash client ID to a group member + ]}} ]}. {mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [ @@ -1712,3 +1766,4 @@ end}. {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. + diff --git a/rebar.config b/rebar.config index d7d14c883..aa77f3a02 100644 --- a/rebar.config +++ b/rebar.config @@ -20,7 +20,8 @@ warn_unused_import, warn_obsolete_guard, debug_info, - {parse_transform, lager_transform}]}. + {parse_transform, lager_transform}, + {d, 'APPLICATION', emqx}]}. {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, warnings_as_errors, deprecated_functions]}. @@ -29,10 +30,5 @@ {cover_export_enabled, true}. %% rebar3_neotoma_plugin is needed to compile the .peg file for cuttlefish -{plugins, [rebar3_neotoma_plugin]}. +{plugins, [coveralls, rebar3_neotoma_plugin]}. -%% Do not include cuttlefish's dependencies as mine -%% its dependencies are only fetched to compile itself -%% they are however not needed by emqx -{overrides, [{override, cuttlefish, [{deps, []}]} - ]}. diff --git a/rebar.config.script b/rebar.config.script index 7c247ac48..0b18592f1 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -3,7 +3,6 @@ CONFIG1 = case os:getenv("TRAVIS") of "true" -> JobId = os:getenv("TRAVIS_JOB_ID"), [{coveralls_service_job_id, JobId}, - {plugins, [coveralls]}, {coveralls_coverdata, "_build/test/cover/*.coverdata"}, {coveralls_service_name , "travis-ci"} | CONFIG]; _ -> diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index a7ce2581d..461564bd2 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -23,12 +23,16 @@ -export([start_link/2, start_bridge/1, stop_bridge/1, status/1]). +-export([show_forwards/1, add_forward/2, del_forward/2]). + +-export([show_subscriptions/1, add_subscription/3, del_subscription/2]). + -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {client_pid, options, reconnect_time, reconnect_count, - def_reconnect_count, type, mountpoint, queue, store_type, - max_pending_messages}). +-record(state, {client_pid, options, reconnect_interval, + mountpoint, queue, mqueue_type, max_pending_messages, + forwards = [], subscriptions = []}). -record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, packet_id, topic, props, payload}). @@ -42,6 +46,50 @@ start_bridge(Name) -> stop_bridge(Name) -> gen_server:call(name(Name), stop_bridge). +-spec(show_forwards(atom()) -> list()). +show_forwards(Name) -> + gen_server:call(name(Name), show_forwards). + +-spec(add_forward(atom(), binary()) -> ok | {error, already_exists | validate_fail}). +add_forward(Name, Topic) -> + case catch emqx_topic:validate({filter, Topic}) of + true -> + gen_server:call(name(Name), {add_forward, Topic}); + {'EXIT', _Reason} -> + {error, validate_fail} + end. + +-spec(del_forward(atom(), binary()) -> ok | {error, validate_fail}). +del_forward(Name, Topic) -> + case catch emqx_topic:validate({filter, Topic}) of + true -> + gen_server:call(name(Name), {del_forward, Topic}); + _ -> + {error, validate_fail} + end. + +-spec(show_subscriptions(atom()) -> list()). +show_subscriptions(Name) -> + gen_server:call(name(Name), show_subscriptions). + +-spec(add_subscription(atom(), binary(), integer()) -> ok | {error, already_exists | validate_fail}). +add_subscription(Name, Topic, QoS) -> + case catch emqx_topic:validate({filter, Topic}) of + true -> + gen_server:call(name(Name), {add_subscription, Topic, QoS}); + {'EXIT', _Reason} -> + {error, validate_fail} + end. + +-spec(del_subscription(atom(), binary()) -> ok | {error, validate_fail}). +del_subscription(Name, Topic) -> + case catch emqx_topic:validate({filter, Topic}) of + true -> + gen_server:call(name(Name), {del_subscription, Topic}); + _ -> + {error, validate_fail} + end. + status(Pid) -> gen_server:call(Pid, status). @@ -55,41 +103,78 @@ init([Options]) -> manual -> ok; auto -> erlang:send_after(1000, self(), start) end, - ReconnectCount = get_value(reconnect_count, Options, 10), - ReconnectTime = get_value(reconnect_time, Options, 30000), + ReconnectInterval = get_value(reconnect_interval, Options, 30000), MaxPendingMsg = get_value(max_pending_messages, Options, 10000), Mountpoint = format_mountpoint(get_value(mountpoint, Options)), - StoreType = get_value(store_type, Options, memory), - Type = get_value(type, Options, in), + MqueueType = get_value(mqueue_type, Options, memory), Queue = [], - {ok, #state{type = Type, - mountpoint = Mountpoint, - queue = Queue, - store_type = StoreType, - options = Options, - reconnect_count = ReconnectCount, - reconnect_time = ReconnectTime, - def_reconnect_count = ReconnectCount, + {ok, #state{mountpoint = Mountpoint, + queue = Queue, + mqueue_type = MqueueType, + options = Options, + reconnect_interval = ReconnectInterval, max_pending_messages = MaxPendingMsg}}. handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> {noreply, NewState} = handle_info(start, State), - {reply, <<"start bridge successfully">>, NewState}; + {reply, #{msg => <<"start bridge successfully">>}, NewState}; handle_call(start_bridge, _From, State) -> - {reply, <<"bridge already started">>, State}; + {reply, #{msg => <<"bridge already started">>}, State}; handle_call(stop_bridge, _From, State = #state{client_pid = undefined}) -> - {reply, <<"bridge not started">>, State}; + {reply, #{msg => <<"bridge not started">>}, State}; handle_call(stop_bridge, _From, State = #state{client_pid = Pid}) -> emqx_client:disconnect(Pid), - {reply, <<"stop bridge successfully">>, State}; + {reply, #{msg => <<"stop bridge successfully">>}, State}; handle_call(status, _From, State = #state{client_pid = undefined}) -> - {reply, <<"Stopped">>, State}; + {reply, #{status => <<"Stopped">>}, State}; handle_call(status, _From, State = #state{client_pid = _Pid})-> - {reply, <<"Running">>, State}; + {reply, #{status => <<"Running">>}, State}; + +handle_call(show_forwards, _From, State = #state{forwards = Forwards}) -> + {reply, Forwards, State}; + +handle_call({add_forward, Topic}, _From, State = #state{forwards = Forwards}) -> + case not lists:member(Topic, Forwards) of + true -> + emqx_broker:subscribe(Topic), + {reply, ok, State#state{forwards = [Topic | Forwards]}}; + false -> + {reply, {error, already_exists}, State} + end; + +handle_call({del_forward, Topic}, _From, State = #state{forwards = Forwards}) -> + case lists:member(Topic, Forwards) of + true -> + emqx_broker:unsubscribe(Topic), + {reply, ok, State#state{forwards = lists:delete(Topic, Forwards)}}; + false -> + {reply, ok, State} + end; + +handle_call(show_subscriptions, _From, State = #state{subscriptions = Subscriptions}) -> + {reply, Subscriptions, State}; + +handle_call({add_subscription, Topic, Qos}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) -> + case not lists:keymember(Topic, 1, Subscriptions) of + true -> + emqx_client:subscribe(ClientPid, {Topic, Qos}), + {reply, ok, State#state{subscriptions = [{Topic, Qos} | Subscriptions]}}; + false -> + {reply, {error, already_exists}, State} + end; + +handle_call({del_subscription, Topic}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) -> + case lists:keymember(Topic, 1, Subscriptions) of + true -> + emqx_client:unsubscribe(ClientPid, Topic), + {reply, ok, State#state{subscriptions = lists:keydelete(Topic, 1, Subscriptions)}}; + false -> + {reply, ok, State} + end; handle_call(Req, _From, State) -> emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), @@ -99,46 +184,24 @@ handle_cast(Msg, State) -> emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info(start, State = #state{reconnect_count = 0}) -> - {noreply, State}; - %%---------------------------------------------------------------- -%% start in message bridge +%% start message bridge %%---------------------------------------------------------------- handle_info(start, State = #state{options = Options, client_pid = undefined, - reconnect_time = ReconnectTime, - reconnect_count = ReconnectCount, - type = in}) -> + reconnect_interval = ReconnectInterval}) -> case emqx_client:start_link([{owner, self()}|options(Options)]) of {ok, ClientPid, _} -> - Subs = get_value(subscriptions, Options, []), - [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], - {noreply, State#state{client_pid = ClientPid}}; + Subs = [{i2b(Topic), Qos} || {Topic, Qos} <- get_value(subscriptions, Options, []), + emqx_topic:validate({filter, i2b(Topic)})], + Forwards = [i2b(Topic) || Topic <- string:tokens(get_value(forwards, Options, ""), ","), + emqx_topic:validate({filter, i2b(Topic)})], + [emqx_client:subscribe(ClientPid, {Topic, Qos}) || {Topic, Qos} <- Subs], + [emqx_broker:subscribe(Topic) || Topic <- Forwards], + {noreply, State#state{client_pid = ClientPid, subscriptions = Subs, forwards = Forwards}}; {error,_} -> - erlang:send_after(ReconnectTime, self(), start), - {noreply, State#state{reconnect_count = ReconnectCount-1}} - end; - -%%---------------------------------------------------------------- -%% start out message bridge -%%---------------------------------------------------------------- -handle_info(start, State = #state{options = Options, - client_pid = undefined, - reconnect_time = ReconnectTime, - reconnect_count = ReconnectCount, - type = out}) -> - case emqx_client:start_link([{owner, self()}|options(Options)]) of - {ok, ClientPid, _} -> - Subs = get_value(subscriptions, Options, []), - [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], - ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","), - [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, - emqx_topic:validate({filter, i2b(Topic)})], - {noreply, State#state{client_pid = ClientPid}}; - {error,_} -> - erlang:send_after(ReconnectTime, self(), start), - {noreply, State#state{reconnect_count = ReconnectCount-1}} + erlang:send_after(ReconnectInterval, self(), start), + {noreply, State} end; %%---------------------------------------------------------------- @@ -146,14 +209,14 @@ handle_info(start, State = #state{options = Options, %%---------------------------------------------------------------- handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, State = #state{client_pid = Pid, mountpoint = Mountpoint, queue = Queue, - store_type = StoreType, max_pending_messages = MaxPendingMsg}) -> + mqueue_type = MqueueType, max_pending_messages = MaxPendingMsg}) -> Msg = #mqtt_msg{qos = 1, retain = Retain, topic = mountpoint(Mountpoint, Topic), payload = Payload}, case emqx_client:publish(Pid, Msg) of {ok, PkgId} -> - {noreply, State#state{queue = store(StoreType, {PkgId, Msg}, Queue, MaxPendingMsg)}}; + {noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}}; {error, Reason} -> emqx_logger:error("Publish fail:~p", [Reason]), {noreply, State} @@ -165,26 +228,25 @@ handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{r handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic, properties := Props, payload := Payload}}, State) -> NewMsg0 = emqx_message:make(bridge, QoS, Topic, Payload), - NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain=> Retain}, NewMsg0)), + NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain => Retain}, NewMsg0)), emqx_broker:publish(NewMsg1), {noreply, State}; %%---------------------------------------------------------------- %% received remote puback message %%---------------------------------------------------------------- -handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, store_type = StoreType}) -> +handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueue_type = MqueueType}) -> % lists:keydelete(PkgId, 1, Queue) - {noreply, State#state{queue = delete(StoreType, PkgId, Queue)}}; + {noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}}; handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) -> {noreply, State#state{client_pid = undefined}}; handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid, - reconnect_time = ReconnectTime, - def_reconnect_count = DefReconnectCount}) -> + reconnect_interval = ReconnectInterval}) -> lager:warning("emqx bridge stop reason:~p", [Reason]), - erlang:send_after(ReconnectTime, self(), start), - {noreply, State#state{client_pid = undefined, reconnect_count = DefReconnectCount}}; + erlang:send_after(ReconnectInterval, self(), start), + {noreply, State#state{client_pid = undefined}}; handle_info(Info, State) -> emqx_logger:error("[Bridge] unexpected info: ~p", [Info]), @@ -196,9 +258,9 @@ terminate(_Reason, #state{}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -proto_ver(mqtt3) -> v3; -proto_ver(mqtt4) -> v4; -proto_ver(mqtt5) -> v5. +proto_ver(mqttv3) -> v3; +proto_ver(mqttv4) -> v4; +proto_ver(mqttv5) -> v5. address(Address) -> case string:tokens(Address, ":") of [Host] -> {Host, 1883}; diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index bc8c0a532..3911da2a6 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -27,7 +27,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% @doc List all bridges --spec(bridges() -> [{node(), Status :: binary()}]). +-spec(bridges() -> [{node(), map()}]). bridges() -> [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)]. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index b2f1bb119..35e8276d2 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -70,7 +70,7 @@ subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> -spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid(), emqx_types:subid() | emqx_types:subopts()) -> ok). subscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> - subscribe(Topic, SubPid, SubId, #{}); + subscribe(Topic, SubPid, SubId, #{qos => 0}); subscribe(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) -> subscribe(Topic, SubPid, undefined, SubOpts); subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) -> diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 85d6ca59d..7ef9c5968 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -787,7 +787,7 @@ connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes), connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), State = #state{subscriptions = Subscriptions}) -> case take_call({unsubscribe, PacketId}, State) of - {value, #call{from = From, req = {_, Topics}}, NewState} -> + {value, #call{from = From, req = {_, _, Topics}}, NewState} -> Subscriptions1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index adda71450..3176ad443 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -148,7 +148,11 @@ init([Transport, RawSocket, Options]) -> proto_state = ProtoState, parser_state = ParserState, enable_stats = EnableStats, - idle_timeout = IdleTimout}), + idle_timeout = IdleTimout + }), + GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), + ok = emqx_gc:init(GcPolicy), + erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State, self(), IdleTimout); {error, Reason} -> @@ -200,16 +204,29 @@ handle_cast(Msg, State) -> handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> case emqx_protocol:deliver(PubOrAck, ProtoState) of {ok, ProtoState1} -> - {noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))}; + State1 = ensure_stats_timer(State#state{proto_state = ProtoState1}), + ok = maybe_gc(State1, PubOrAck), + {noreply, State1}; {error, Reason} -> shutdown(Reason, State) end; - handle_info({timeout, Timer, emit_stats}, - State = #state{stats_timer = Timer, proto_state = ProtoState}) -> + State = #state{stats_timer = Timer, + proto_state = ProtoState + }) -> emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), - {noreply, State#state{stats_timer = undefined}, hibernate}; - + NewState = State#state{stats_timer = undefined}, + Limits = erlang:get(force_shutdown_policy), + case emqx_misc:conn_proc_mng_policy(Limits) of + continue -> + {noreply, NewState}; + hibernate -> + ok = emqx_gc:reset(), + {noreply, NewState, hibernate}; + {shutdown, Reason} -> + ?LOG(warning, "shutdown due to ~p", [Reason], NewState), + shutdown(Reason, NewState) + end; handle_info(timeout, State) -> shutdown(idle_timeout, State); @@ -295,15 +312,16 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% Receive and parse data -handle_packet(<<>>, State) -> - {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))}; - +handle_packet(<<>>, State0) -> + State = ensure_stats_timer(ensure_rate_limit(State0)), + ok = maybe_gc(State, incoming), + {noreply, State}; handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, idle_timeout = IdleTimeout}) -> case catch emqx_frame:parse(Data, ParserState) of {more, NewParserState} -> - {noreply, State#state{parser_state = NewParserState}, IdleTimeout}; + {noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of @@ -381,7 +399,13 @@ shutdown(Reason, State) -> stop(Reason, State) -> {stop, Reason, State}. -maybe_gc(State) -> - %% TODO: gc and shutdown policy - State. +%% For incoming messages, bump gc-stats with packet count and totoal volume +%% For outgoing messages, only 'publish' type is taken into account. +maybe_gc(#state{incoming = #{bytes := Oct, packets := Cnt}}, incoming) -> + ok = emqx_gc:inc(Cnt, Oct); +maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> + Oct = iolist_size(Payload), + ok = emqx_gc:inc(1, Oct); +maybe_gc(_, _) -> + ok. diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index c16bd23cb..17166a014 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -54,14 +54,14 @@ run_command([Cmd | Args]) -> run_command(list_to_atom(Cmd), Args). -spec(run_command(cmd(), [string()]) -> ok | {error, term()}). -run_command(set, []) -> - emqx_mgmt_cli_cfg:set_usage(), ok; +% run_command(set, []) -> +% emqx_mgmt_cli_cfg:set_usage(), ok; -run_command(set, Args) -> - emqx_mgmt_cli_cfg:run(["config" | Args]), ok; +% run_command(set, Args) -> +% emqx_mgmt_cli_cfg:run(["config" | Args]), ok; -run_command(show, Args) -> - emqx_mgmt_cli_cfg:run(["config" | Args]), ok; +% run_command(show, Args) -> +% emqx_mgmt_cli_cfg:run(["config" | Args]), ok; run_command(help, []) -> usage(); diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 5a32b43c5..7e98eb37a 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -12,38 +12,85 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% GC Utility functions. +%% @doc This module manages an opaque collection of statistics data used to +%% force garbage collection on `self()' process when hitting thresholds. +%% Namely: +%% (1) Total number of messages passed through +%% (2) Total data volume passed through +%% @end -module(emqx_gc). -%% Memory: (10, 100, 1000) -%% +-export([init/1, inc/2, reset/0]). --export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2, - maybe_force_gc/3]). +-type st() :: #{ cnt => {integer(), integer()} + , oct => {integer(), integer()} + }. --spec(conn_max_gc_count() -> integer()). -conn_max_gc_count() -> - case emqx_config:get_env(conn_force_gc_count) of - I when is_integer(I), I > 0 -> I + rand:uniform(I); - I when is_integer(I), I =< 0 -> undefined; - undefined -> undefined +-define(disabled, disabled). +-define(ENABLED(X), (is_integer(X) andalso X > 0)). + +%% @doc Initialize force GC parameters. +-spec init(false | map()) -> ok. +init(#{count := Count, bytes := Bytes}) -> + Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)], + Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)], + erlang:put(?MODULE, maps:from_list(Cnt ++ Oct)), + ok; +init(_) -> erlang:put(?MODULE, #{}), ok. + +%% @doc Increase count and bytes stats in one call, +%% ensure gc is triggered at most once, even if both thresholds are hit. +-spec inc(pos_integer(), pos_integer()) -> ok. +inc(Cnt, Oct) -> + mutate_pd_with(fun(St) -> inc(St, Cnt, Oct) end). + +%% @doc Reset counters to zero. +-spec reset() -> ok. +reset() -> + mutate_pd_with(fun(St) -> reset(St) end). + +%% ======== Internals ======== + +%% mutate gc stats numbers in process dict with the given function +mutate_pd_with(F) -> + St = F(erlang:get(?MODULE)), + erlang:put(?MODULE, St), + ok. + +%% Increase count and bytes stats in one call, +%% ensure gc is triggered at most once, even if both thresholds are hit. +-spec inc(st(), pos_integer(), pos_integer()) -> st(). +inc(St0, Cnt, Oct) -> + case do_inc(St0, cnt, Cnt) of + {true, St} -> + St; + {false, St1} -> + {_, St} = do_inc(St1, oct, Oct), + St end. --spec(reset_conn_gc_count(pos_integer(), tuple()) -> tuple()). -reset_conn_gc_count(Pos, State) -> - case element(Pos, State) of - undefined -> State; - _I -> setelement(Pos, State, conn_max_gc_count()) +%% Reset counters to zero. +reset(St) -> reset(cnt, reset(oct, St)). + +-spec do_inc(st(), cnt | oct, pos_integer()) -> {boolean(), st()}. +do_inc(St, Key, Num) -> + case maps:get(Key, St, ?disabled) of + ?disabled -> + {false, St}; + {Init, Remain} when Remain > Num -> + {false, maps:put(Key, {Init, Remain - Num}, St)}; + _ -> + {true, do_gc(St)} end. -maybe_force_gc(Pos, State) -> - maybe_force_gc(Pos, State, fun() -> ok end). -maybe_force_gc(Pos, State, Cb) -> - case element(Pos, State) of - undefined -> State; - I when I =< 0 -> Cb(), garbage_collect(), - reset_conn_gc_count(Pos, State); - I -> setelement(Pos, State, I - 1) +do_gc(St) -> + erlang:garbage_collect(), + reset(St). + +reset(Key, St) -> + case maps:get(Key, St, ?disabled) of + ?disabled -> St; + {Init, _} -> maps:put(Key, {Init, Init}, St) end. diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 421304f3a..d3ef43069 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -51,12 +51,12 @@ start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls -> %% Start MQTT/WS listener start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> - Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch); %% Start MQTT/WSS listener start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> - Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch). start_mqtt_listener(Name, ListenOn, Options) -> @@ -67,6 +67,9 @@ start_mqtt_listener(Name, ListenOn, Options) -> start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). +mqtt_path(Options) -> + proplists:get_value(mqtt_path, Options, "/mqtt"). + ranch_opts(Options) -> NumAcceptors = proplists:get_value(acceptors, Options, 4), MaxConnections = proplists:get_value(max_connections, Options, 1024), diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 311bd58dd..91e5d4d59 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -22,7 +22,7 @@ -export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]). -export([set_headers/2]). -export([get_header/2, get_header/3, set_header/3]). --export([is_expired/1, check_expiry/1, check_expiry/2, update_expiry/1]). +-export([is_expired/1, update_expiry/1]). -export([format/1]). -type(flag() :: atom()). @@ -100,21 +100,6 @@ is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestam is_expired(_Msg) -> false. --spec(check_expiry(emqx_types:message()) -> {ok, pos_integer()} | expired | false). -check_expiry(Msg = #message{timestamp = CreatedAt}) -> - check_expiry(Msg, CreatedAt); -check_expiry(_Msg) -> - false. - --spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false). -check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) -> - case Interval - (elapsed(Since) div 1000) of - Timeout when Timeout > 0 -> {ok, Timeout}; - _ -> expired - end; -check_expiry(_Msg, _Since) -> - false. - update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> case elapsed(CreatedAt) of Elapsed when Elapsed > 0 -> @@ -138,4 +123,3 @@ format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); format(headers, Headers) -> io_lib:format("~p", [Headers]). - diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index e2b60a156..656b0fca3 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -15,7 +15,7 @@ -module(emqx_misc). -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, - proc_name/2, proc_stats/0, proc_stats/1]). + proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/1]). %% @doc Merge options -spec(merge_opts(list(), list()) -> list()). @@ -36,14 +36,13 @@ start_timer(Interval, Dest, Msg) -> erlang:start_timer(Interval, Dest, Msg). -spec(cancel_timer(undefined | reference()) -> ok). -cancel_timer(undefined) -> - ok; -cancel_timer(Timer) -> - case catch erlang:cancel_timer(Timer) of +cancel_timer(Timer) when is_reference(Timer) -> + case erlang:cancel_timer(Timer) of false -> receive {timeout, Timer, _} -> ok after 0 -> ok end; _ -> ok - end. + end; +cancel_timer(_) -> ok. -spec(proc_name(atom(), pos_integer()) -> atom()). proc_name(Mod, Id) -> @@ -59,3 +58,50 @@ proc_stats(Pid) -> {value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats), [{mailbox_len, V} | Stats1]. +-define(DISABLED, 0). + +%% @doc Check self() process status against connection/session process management policy, +%% return `continue | hibernate | {shutdown, Reason}' accordingly. +%% `continue': There is nothing out of the ordinary. +%% `hibernate': Nothing to process in my mailbox, and since this check is triggered +%% by a timer, we assume it is a fat chance to continue idel, hence hibernate. +%% `shutdown': Some numbers (message queue length or heap size have hit the limit), +%% hence shutdown for greater good (system stability). +-spec(conn_proc_mng_policy(#{message_queue_len := integer(), + total_heap_size := integer() + } | undefined) -> continue | hibernate | {shutdown, _}). +conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen, + total_heap_size := MaxTotalHeapSize + }) -> + Qlength = proc_info(message_queue_len), + Checks = + [{fun() -> is_message_queue_too_long(Qlength, MaxMsgQueueLen) end, + {shutdown, message_queue_too_long}}, + {fun() -> is_heap_size_too_large(MaxTotalHeapSize) end, + {shutdown, total_heap_size_too_large}}, + {fun() -> Qlength > 0 end, continue}, + {fun() -> true end, hibernate} + ], + check(Checks); +conn_proc_mng_policy(_) -> + %% disable by default + conn_proc_mng_policy(#{message_queue_len => 0, total_heap_size => 0}). + +check([{Pred, Result} | Rest]) -> + case Pred() of + true -> Result; + false -> check(Rest) + end. + +is_message_queue_too_long(Qlength, Max) -> + is_enabled(Max) andalso Qlength > Max. + +is_heap_size_too_large(Max) -> + is_enabled(Max) andalso proc_info(total_heap_size) > Max. + +is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED. + +proc_info(Key) -> + {Key, Value} = erlang:process_info(self(), Key), + Value. + diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index a93fd8838..d9270dd5f 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -18,7 +18,7 @@ %% %% This module implements a simple in-memory queue for MQTT persistent session. %% -%% If the broker restarted or crashed, all the messages queued will be gone. +%% If the broker restarts or crashes, all queued messages will be lost. %% %% Concept of Message Queue and Inflight Window: %% @@ -29,12 +29,15 @@ %% |<--- Win Size --->| %% %% -%% 1. Inflight Window to store the messages delivered and awaiting for puback. +%% 1. Inflight Window is to store the messages +%% that are delivered but still awaiting for puback. %% -%% 2. Enqueue messages when the inflight window is full. +%% 2. Messages are enqueued to tail when the inflight window is full. %% -%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true, -%% otherwise dropped the oldest one. +%% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true` +%% in init options +%% +%% 4. If the queue is full drop the oldest one unless `max_len' is set to `0'. %% %% @end diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index 276352797..762f5dc6d 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -17,7 +17,8 @@ -behaviour(gen_server). -export([start_link/0, start_link/2]). --export([submit/1, async_submit/1]). +-export([submit/1, submit/2]). +-export([async_submit/1, async_submit/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -25,6 +26,8 @@ -define(POOL, ?MODULE). +-type(task() :: fun() | mfa() | {fun(), Args :: list(any())}). + %% @doc Start pooler supervisor. start_link() -> emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}). @@ -34,18 +37,35 @@ start_link() -> start_link(Pool, Id) -> gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). -%% @doc Submit work to the pool --spec(submit(fun()) -> any()). -submit(Fun) -> - gen_server:call(worker(), {submit, Fun}, infinity). +%% @doc Submit work to the pool. +-spec(submit(task()) -> any()). +submit(Task) -> + call({submit, Task}). -%% @doc Submit work to the pool asynchronously --spec(async_submit(fun()) -> ok). -async_submit(Fun) -> - gen_server:cast(worker(), {async_submit, Fun}). +-spec(submit(fun(), list(any())) -> any()). +submit(Fun, Args) -> + call({submit, {Fun, Args}}). +%% @private +call(Req) -> + gen_server:call(worker(), Req, infinity). + +%% @doc Submit work to the pool asynchronously. +-spec(async_submit(task()) -> ok). +async_submit(Task) -> + cast({async_submit, Task}). + +-spec(async_submit(fun(), list(any())) -> ok). +async_submit(Fun, Args) -> + cast({async_submit, {Fun, Args}}). + +%% @private +cast(Msg) -> + gen_server:cast(worker(), Msg). + +%% @private worker() -> - gproc_pool:pick_worker(pool). + gproc_pool:pick_worker(?POOL). %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -55,15 +75,15 @@ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({submit, Fun}, _From, State) -> - {reply, catch run(Fun), State}; +handle_call({submit, Task}, _From, State) -> + {reply, catch run(Task), State}; handle_call(Req, _From, State) -> emqx_logger:error("[Pool] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({async_submit, Fun}, State) -> - try run(Fun) +handle_cast({async_submit, Task}, State) -> + try run(Task) catch _:Error:Stacktrace -> emqx_logger:error("[Pool] error: ~p, ~p", [Error, Stacktrace]) end, @@ -78,7 +98,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> - true = gproc_pool:disconnect_worker(Pool, {Pool, Id}). + gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -89,6 +109,8 @@ code_change(_OldVsn, State, _Extra) -> run({M, F, A}) -> erlang:apply(M, F, A); +run({F, A}) when is_function(F), is_list(A) -> + erlang:apply(F, A); run(Fun) when is_function(Fun) -> Fun(). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 8301cf014..15a287116 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -410,9 +410,17 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) -> - %% Clean willmsg - {stop, normal, PState#pstate{will_msg = undefined}}; +process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), + PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) -> + case Interval =/= 0 andalso OldInterval =:= 0 of + true -> + deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), + {error, protocol_error, PState}; + false -> + emqx_session:update_expiry_interval(SPid, Interval), + %% Clean willmsg + {stop, normal, PState#pstate{will_msg = undefined}} + end; process_packet(?DISCONNECT_PACKET(_), PState) -> {stop, normal, PState}. @@ -481,22 +489,30 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, max_topic_alias := MaxAlias, mqtt_shared_subscription := Shared, mqtt_wildcard_subscription := Wildcard} = caps(PState), - Props = #{'Maximum-QoS' => MaxQoS, - 'Retain-Available' => flag(Retain), + Props = #{'Retain-Available' => flag(Retain), 'Maximum-Packet-Size' => MaxPktSize, 'Topic-Alias-Maximum' => MaxAlias, 'Wildcard-Subscription-Available' => flag(Wildcard), 'Subscription-Identifier-Available' => 1, 'Shared-Subscription-Available' => flag(Shared)}, - Props1 = if IsAssigned -> - Props#{'Assigned-Client-Identifier' => ClientId}; - true -> Props + + Props1 = if + MaxQoS =:= ?QOS_2 -> + Props; + true -> + maps:put('Maximum-QoS', MaxQoS, Props) + end, + + Props2 = if IsAssigned -> + Props1#{'Assigned-Client-Identifier' => ClientId}; + true -> Props1 end, - Props2 = case emqx_zone:get_env(Zone, server_keepalive) of - undefined -> Props1; - Keepalive -> Props1#{'Server-Keep-Alive' => Keepalive} + + Props3 = case emqx_zone:get_env(Zone, server_keepalive) of + undefined -> Props2; + Keepalive -> Props2#{'Server-Keep-Alive' => Keepalive} end, - send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props2), PState); + send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState); deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); @@ -562,17 +578,32 @@ maybe_assign_client_id(PState) -> PState. try_open_session(#pstate{zone = Zone, + proto_ver = ProtoVer, client_id = ClientId, conn_pid = ConnPid, conn_props = ConnProps, username = Username, clean_start = CleanStart}) -> - case emqx_sm:open_session(#{zone => Zone, - client_id => ClientId, - conn_pid => ConnPid, - username => Username, - clean_start => CleanStart, - conn_props => ConnProps}) of + + SessAttrs = #{ + zone => Zone, + client_id => ClientId, + conn_pid => ConnPid, + username => Username, + clean_start => CleanStart + }, + + case emqx_sm:open_session(maps:put(expiry_interval, if + ProtoVer =:= ?MQTT_PROTO_V5 -> + maps:get('Session-Expiry-Interval', ConnProps, 0); + true -> + case CleanStart of + true -> + 0; + false -> + emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) + end + end, SessAttrs)) of {ok, SPid} -> {ok, SPid, false}; Other -> Other diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 118ab6e21..14ed2a21d 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -47,6 +47,7 @@ -export([info/1, attrs/1]). -export([stats/1]). -export([resume/2, discard/2]). +-export([update_expiry_interval/2]). -export([subscribe/2, subscribe/4]). -export([publish/3]). -export([puback/2, puback/3]). @@ -313,6 +314,10 @@ resume(SPid, ConnPid) -> discard(SPid, ByPid) -> gen_server:call(SPid, {discard, ByPid}, infinity). +-spec(update_expiry_interval(spid(), timeout()) -> ok). +update_expiry_interval(SPid, Interval) -> + gen_server:cast(SPid, {expiry_interval, Interval * 1000}). + -spec(close(spid()) -> ok). close(SPid) -> gen_server:call(SPid, close, infinity). @@ -321,12 +326,12 @@ close(SPid) -> %% gen_server callbacks %%------------------------------------------------------------------------------ -init([Parent, #{zone := Zone, - client_id := ClientId, - username := Username, - conn_pid := ConnPid, - clean_start := CleanStart, - conn_props := ConnProps}]) -> +init([Parent, #{zone := Zone, + client_id := ClientId, + username := Username, + conn_pid := ConnPid, + clean_start := CleanStart, + expiry_interval := ExpiryInterval}]) -> process_flag(trap_exit, true), true = link(ConnPid), MaxInflight = get_env(Zone, max_inflight), @@ -346,22 +351,21 @@ init([Parent, #{zone := Zone, awaiting_rel = #{}, await_rel_timeout = get_env(Zone, await_rel_timeout), max_awaiting_rel = get_env(Zone, max_awaiting_rel), - expiry_interval = expire_interval(Zone, ConnProps), + expiry_interval = ExpiryInterval, enable_stats = get_env(Zone, enable_stats, true), deliver_stats = 0, enqueue_stats = 0, - created_at = os:timestamp()}, + created_at = os:timestamp() + }, emqx_sm:register_session(ClientId, attrs(State)), emqx_sm:set_session_stats(ClientId, stats(State)), emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), + GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), + ok = emqx_gc:init(GcPolicy), + erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)), ok = proc_lib:init_ack(Parent, {ok, self()}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). -expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) -> - I * 1000; -expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1 - get_env(Zone, session_expiry_interval, 0). - init_mqueue(Zone) -> emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple), max_len => get_env(Zone, max_mqueue_len, 1000), @@ -536,6 +540,9 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, %% Replay delivery and Dequeue pending messages noreply(dequeue(retry_delivery(true, State1))); +handle_cast({expiry_interval, Interval}, State) -> + {noreply, State#state{expiry_interval = Interval}}; + handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), {noreply, State}. @@ -567,21 +574,30 @@ handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) -> noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined})); -handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> +handle_info({timeout, Timer, emit_stats}, + State = #state{client_id = ClientId, + stats_timer = Timer}) -> _ = emqx_sm:set_session_stats(ClientId, stats(State)), - {noreply, State#state{stats_timer = undefined}, hibernate}; - + NewState = State#state{stats_timer = undefined}, + Limits = erlang:get(force_shutdown_policy), + case emqx_misc:conn_proc_mng_policy(Limits) of + continue -> + {noreply, NewState}; + hibernate -> + ok = emqx_gc:reset(), %% going to hibernate, reset gc stats + {noreply, NewState, hibernate}; + {shutdown, Reason} -> + ?LOG(warning, "shutdown due to ~p", [Reason], NewState), + shutdown(Reason, NewState) + end; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> ?LOG(info, "expired, shutdown now:(", [], State), shutdown(expired, State); -handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) -> - {stop, Reason, State#state{conn_pid = undefined}}; - handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) -> {stop, Reason, State#state{conn_pid = undefined}}; -handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) -> +handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> {noreply, ensure_expire_timer(State#state{conn_pid = undefined})}; handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> @@ -744,21 +760,22 @@ dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) -> end; %% Deliver qos0 message directly to client -dispatch(Msg = #message{qos = ?QOS0}, State) -> +dispatch(Msg = #message{qos = ?QOS0} = Msg, State) -> deliver(undefined, Msg, State), - inc_stats(deliver, State); + inc_stats(deliver, Msg, State); -dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight}) +dispatch(Msg = #message{qos = QoS} = Msg, + State = #state{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> case emqx_inflight:is_full(Inflight) of true -> enqueue_msg(Msg, State); false -> deliver(PacketId, Msg, State), - await(PacketId, Msg, inc_stats(deliver, next_pkt_id(State))) + await(PacketId, Msg, inc_stats(deliver, Msg, next_pkt_id(State))) end. enqueue_msg(Msg, State = #state{mqueue = Q}) -> - inc_stats(enqueue, State#state{mqueue = emqx_mqueue:in(Msg, Q)}). + inc_stats(enqueue, Msg, State#state{mqueue = emqx_mqueue:in(Msg, Q)}). %%------------------------------------------------------------------------------ %% Deliver @@ -775,7 +792,7 @@ redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) -> deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = local}) -> ConnPid ! {deliver, {publish, PacketId, Msg}}; deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = remote}) -> - emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, PacketId, Msg}]). + emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, {publish, PacketId, Msg}}]). %%------------------------------------------------------------------------------ %% Awaiting ACK for QoS1/QoS2 Messages @@ -859,8 +876,8 @@ ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) -> ensure_retry_timer(_Timeout, State) -> State. -ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 -> - State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)}; +ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 andalso Interval =/= 16#ffffffff -> + State#state{expiry_timer = emqx_misc:start_timer(Interval * 1000, expired)}; ensure_expire_timer(State) -> State. @@ -882,11 +899,19 @@ next_pkt_id(State = #state{next_pkt_id = Id}) -> %%------------------------------------------------------------------------------ %% Inc stats -inc_stats(deliver, State = #state{deliver_stats = I}) -> +inc_stats(deliver, Msg, State = #state{deliver_stats = I}) -> + MsgSize = msg_size(Msg), + ok = emqx_gc:inc(1, MsgSize), State#state{deliver_stats = I + 1}; -inc_stats(enqueue, State = #state{enqueue_stats = I}) -> +inc_stats(enqueue, _Msg, State = #state{enqueue_stats = I}) -> State#state{enqueue_stats = I + 1}. +%% Take only the payload size into account, add other fields if necessary +msg_size(#message{payload = Payload}) -> payload_size(Payload). + +%% Payload should be binary(), but not 100% sure. Need dialyzer! +payload_size(Payload) -> erlang:iolist_size(Payload). + %%------------------------------------------------------------------------------ %% Helper functions @@ -902,5 +927,3 @@ noreply(State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. -%% TODO: GC Policy and Shutdown Policy -%% maybe_gc(State) -> State. diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 0cbfab60a..02422fe55 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -26,7 +26,6 @@ -export([start_link/0]). --export([strategy/0]). -export([subscribe/3, unsubscribe/3]). -export([dispatch/3]). @@ -36,6 +35,7 @@ -define(SERVER, ?MODULE). -define(TAB, emqx_shared_subscription). +-define(ALIVE_SUBS, emqx_alive_shared_subscribers). -record(state, {pmon}). -record(emqx_shared_subscription, {group, topic, subpid}). @@ -62,9 +62,9 @@ mnesia(copy) -> start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(strategy() -> round_robin | random | hash). +-spec(strategy() -> random | round_robin | sticky | hash). strategy() -> - emqx_config:get_env(shared_subscription_strategy, random). + emqx_config:get_env(shared_subscription_strategy, round_robin). subscribe(undefined, _Topic, _SubPid) -> ok; @@ -80,23 +80,56 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. -%% TODO: dispatch strategy, ensure the delivery... dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) -> - case pick(subscribers(Group, Topic)) of + #message{from = ClientId} = Msg, + case pick(strategy(), ClientId, Group, Topic) of false -> Delivery; SubPid -> SubPid ! {dispatch, Topic, Msg}, Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]} end. -pick([]) -> - false; -pick([SubPid]) -> - SubPid; -pick(SubPids) -> - lists:nth(rand:uniform(length(SubPids)), SubPids). +pick(sticky, ClientId, Group, Topic) -> + Sub0 = erlang:get(shared_sub_sticky), + case is_sub_alive(Sub0) of + true -> + %% the old subscriber is still alive + %% keep using it for sticky strategy + Sub0; + false -> + %% randomly pick one for the first message + Sub = do_pick(random, ClientId, Group, Topic), + %% stick to whatever pick result + erlang:put(shared_sub_sticky, Sub), + Sub + end; +pick(Strategy, ClientId, Group, Topic) -> + do_pick(Strategy, ClientId, Group, Topic). + +do_pick(Strategy, ClientId, Group, Topic) -> + All = subscribers(Group, Topic), + pick_subscriber(Strategy, ClientId, All). + +pick_subscriber(_, _ClientId, []) -> false; +pick_subscriber(_, _ClientId, [Sub]) -> Sub; +pick_subscriber(Strategy, ClientId, Subs) -> + Nth = do_pick_subscriber(Strategy, ClientId, length(Subs)), + lists:nth(Nth, Subs). + +do_pick_subscriber(random, _ClientId, Count) -> + rand:uniform(Count); +do_pick_subscriber(hash, ClientId, Count) -> + 1 + erlang:phash2(ClientId) rem Count; +do_pick_subscriber(round_robin, _ClientId, Count) -> + Rem = case erlang:get(shared_sub_round_robin) of + undefined -> 0; + N -> (N + 1) rem Count + end, + _ = erlang:put(shared_sub_round_robin, Rem), + Rem + 1. subscribers(Group, Topic) -> ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). + %%----------------------------------------------------------------------------- %% gen_server callbacks %%----------------------------------------------------------------------------- @@ -104,6 +137,7 @@ subscribers(Group, Topic) -> init([]) -> {atomic, PMon} = mnesia:transaction(fun init_monitors/0), mnesia:subscribe({table, ?TAB, simple}), + ets:new(?ALIVE_SUBS, [named_table, {read_concurrency, true}, protected]), {ok, update_stats(#state{pmon = PMon})}. init_monitors() -> @@ -117,8 +151,9 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) -> - {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; - + NewPmon = emqx_pmon:monitor(SubPid, PMon), + ets:insert(?ALIVE_SUBS, {SubPid}), + {noreply, update_stats(State#state{pmon = NewPmon})}; handle_cast(Msg, State) -> emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]), {noreply, State}. @@ -154,6 +189,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- cleanup_down(SubPid) -> + ets:delete(?ALIVE_SUBS, SubPid), lists:foreach( fun(Record) -> mnesia:dirty_delete_object(?TAB, Record) @@ -162,3 +198,7 @@ cleanup_down(SubPid) -> update_stats(State) -> emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State. +%% erlang:is_process_alive/1 is expensive +%% and does not work with remote pids +is_sub_alive(Sub) -> [] =/= ets:lookup(?ALIVE_SUBS, Sub). + diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index b3b417717..6540bbef2 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -188,8 +188,8 @@ parse(Topic = <<"$share/", Topic1/binary>>, Options) -> case binary:split(Topic1, <<"/">>) of [<<>>] -> error({invalid_topic, Topic}); [_] -> error({invalid_topic, Topic}); - [Group, Topic2] -> - case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of + [Group, Topic2] -> + case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of nomatch -> {Topic2, maps:put(share, Group, Options)}; _ -> error({invalid_topic, Topic}) end diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index a08305a30..166d64a30 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -49,8 +49,18 @@ -define(PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, <<"publish">>)). +-define(PAYLOAD, [{type,"dsmSimulationData"}, + {id, 9999}, + {status, "running"}, + {soc, 1536702170}, + {fracsec, 451000}, + {data, lists:seq(1, 20480)}]). + +-define(BIG_PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, emqx_json:encode(?PAYLOAD))). + all() -> - [{group, connect}]. + [{group, connect}, + {group, publish}]. groups() -> [{connect, [non_parallel_tests], @@ -60,6 +70,10 @@ groups() -> mqtt_connect_with_ssl_oneway, mqtt_connect_with_ssl_twoway, mqtt_connect_with_ws + ]}, + {publish, [non_parallel_tests], + [ + packet_size ]}]. init_per_suite(Config) -> @@ -157,6 +171,21 @@ mqtt_connect_with_ws(_Config) -> {close, _} = rfc6455_client:close(WS), ok. +%%issue 1811 +packet_size(_Config) -> + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + Packet = raw_send_serialise(?CLIENT), + emqx_client_sock:send(Sock, Packet), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), + + %% Pub Packet QoS 1 + PubPacket = raw_send_serialise(?BIG_PUBPACKET), + emqx_client_sock:send(Sock, PubPacket), + {ok, Data1} = gen_tcp:recv(Sock, 0), + {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(Data1), + emqx_client_sock:close(Sock). + raw_send_serialise(Packet) -> emqx_frame:serialize(Packet). diff --git a/test/emqx_gc_tests.erl b/test/emqx_gc_tests.erl new file mode 100644 index 000000000..ffcac91d1 --- /dev/null +++ b/test/emqx_gc_tests.erl @@ -0,0 +1,53 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_gc_tests). + +-include_lib("eunit/include/eunit.hrl"). + +trigger_by_cnt_test() -> + Args = #{count => 2, bytes => 0}, + ok = emqx_gc:init(Args), + ok = emqx_gc:inc(1, 1000), + St1 = inspect(), + ?assertMatch({_, Remain} when Remain > 0, maps:get(cnt, St1)), + ok = emqx_gc:inc(2, 2), + St2 = inspect(), + ok = emqx_gc:inc(0, 2000), + St3 = inspect(), + ?assertEqual(St2, St3), + ?assertMatch({N, N}, maps:get(cnt, St2)), + ?assertNot(maps:is_key(oct, St2)), + ok. + +trigger_by_oct_test() -> + Args = #{count => 2, bytes => 2}, + ok = emqx_gc:init(Args), + ok = emqx_gc:inc(1, 1), + St1 = inspect(), + ?assertMatch({_, Remain} when Remain > 0, maps:get(oct, St1)), + ok = emqx_gc:inc(2, 2), + St2 = inspect(), + ?assertMatch({N, N}, maps:get(oct, St2)), + ?assertMatch({M, M}, maps:get(cnt, St2)), + ok. + +disabled_test() -> + Args = #{count => -1, bytes => false}, + ok = emqx_gc:init(Args), + ok = emqx_gc:inc(1, 1), + ?assertEqual(#{}, inspect()), + ok. + +inspect() -> erlang:get(emqx_gc). diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index c2ca0f0aa..37207e40c 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -29,7 +29,7 @@ all() -> message_flag, message_header, message_format, - message_expired + message_expired ]. message_make(_) -> @@ -53,7 +53,7 @@ message_flag(_) -> ?assert(emqx_message:get_flag(dup, Msg6)), ?assert(emqx_message:get_flag(retain, Msg6)). -message_header(_) -> +message_header(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg), Msg2 = emqx_message:set_header(c, 3, Msg1), @@ -68,11 +68,8 @@ message_expired(_) -> Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg), timer:sleep(500), ?assertNot(emqx_message:is_expired(Msg1)), - {ok, 1} = emqx_message:check_expiry(Msg1), timer:sleep(600), ?assert(emqx_message:is_expired(Msg1)), - expired = emqx_message:check_expiry(Msg1), timer:sleep(1000), Msg2 = emqx_message:update_expiry(Msg1), ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). - diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl new file mode 100644 index 000000000..40a9aae87 --- /dev/null +++ b/test/emqx_misc_tests.erl @@ -0,0 +1,46 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_misc_tests). +-include_lib("eunit/include/eunit.hrl"). + +timer_cancel_flush_test() -> + Timer = emqx_misc:start_timer(0, foo), + ok = emqx_misc:cancel_timer(Timer), + receive {timeout, Timer, foo} -> error(unexpected) + after 0 -> ok + end. + +shutdown_disabled_test() -> + self() ! foo, + ?assertEqual(continue, conn_proc_mng_policy(0, 0)), + receive foo -> ok end, + ?assertEqual(hibernate, conn_proc_mng_policy(0, 0)). + +message_queue_too_long_test() -> + self() ! foo, + self() ! bar, + ?assertEqual({shutdown, message_queue_too_long}, + conn_proc_mng_policy(1, 0)), + receive foo -> ok end, + ?assertEqual(continue, conn_proc_mng_policy(1, 0)), + receive bar -> ok end. + +total_heap_size_too_large_test() -> + ?assertEqual({shutdown, total_heap_size_too_large}, + conn_proc_mng_policy(0, 1)). + +conn_proc_mng_policy(L, S) -> + emqx_misc:conn_proc_mng_policy(#{message_queue_len => L, + total_heap_size => S}). diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 85114e3a9..4528239e6 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -16,14 +16,12 @@ -behaviour(gen_server). --export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0]). +-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {clean_start, client_id, client_pid}). - --define(TAB, messages). +-record(state, {clean_start, client_id, client_pid, last_msg}). start_link(ClientId) -> gen_server:start_link(?MODULE, [ClientId], []). @@ -37,52 +35,45 @@ close_session(ClientPid, SessPid) -> stop(CPid) -> gen_server:call(CPid, stop). -get_last_message() -> - [{last_message, Msg}] = ets:lookup(?TAB, last_message), - Msg. +get_last_message(Pid) -> + gen_server:call(Pid, get_last_message). init([ClientId]) -> - Result = lists:member(?TAB, ets:all()), - if Result == false -> - ets:new(?TAB, [set, named_table, public]); - true -> ok - end, - {ok, - #state{clean_start = true, - client_id = ClientId} + {ok, #state{clean_start = true, + client_id = ClientId, + last_msg = undefined + } }. handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> - Attrs = #{ zone => Zone, - client_id => ClientId, - conn_pid => ClientPid, - clean_start => true, - username => undefined, - conn_props => undefined + Attrs = #{ zone => Zone, + client_id => ClientId, + conn_pid => ClientPid, + clean_start => true, + username => undefined, + expiry_interval => 0 }, {ok, SessPid} = emqx_sm:open_session(Attrs), - {reply, {ok, SessPid}, State#state{ - clean_start = true, - client_id = ClientId, - client_pid = ClientPid - }}; - + {reply, {ok, SessPid}, + State#state{clean_start = true, + client_id = ClientId, + client_pid = ClientPid + }}; handle_call({stop_session, SessPid}, _From, State) -> emqx_sm:close_session(SessPid), {stop, normal, ok, State}; - +handle_call(get_last_message, _From, #state{last_msg = Msg} = State) -> + {reply, Msg, State}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; - handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. -handle_info({_, Msg}, State) -> - ets:insert(?TAB, {last_message, Msg}), - {noreply, State}; +handle_info({deliver, Msg}, State) -> + {noreply, State#state{last_msg = Msg}}; handle_info(_Info, State) -> {noreply, State}. diff --git a/test/emqx_pool_SUITE.erl b/test/emqx_pool_SUITE.erl new file mode 100644 index 000000000..3d7d0f7e5 --- /dev/null +++ b/test/emqx_pool_SUITE.erl @@ -0,0 +1,65 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_pool_SUITE). + +-compile(export_all). + +-compile(nowarn_export_all). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> [ + {group, submit_case}, + {group, async_submit_case} + ]. + +groups() -> + [ + {submit_case, [sequence], [submit_mfa, submit_fa]}, + {async_submit_case, [sequence], [async_submit_mfa]} + ]. + +init_per_suite(Config) -> + application:ensure_all_started(gproc), + Config. + +end_per_suite(_Config) -> + ok. + +submit_mfa(_Config) -> + erlang:process_flag(trap_exit, true), + {ok, Pid} = emqx_pool:start_link(), + Result = emqx_pool:submit({?MODULE, test_mfa, []}), + ?assertEqual(15, Result), + gen_server:stop(Pid, normal, 3000), + ok. + +submit_fa(_Config) -> + {ok, Pid} = emqx_pool:start_link(), + Fun = fun(X) -> case X rem 2 of 0 -> {true, X div 2}; _ -> false end end, + Result = emqx_pool:submit(Fun, [2]), + ?assertEqual({true, 1}, Result), + exit(Pid, normal). + +test_mfa() -> + lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]). + +async_submit_mfa(_Config) -> + {ok, Pid} = emqx_pool:start_link(), + emqx_pool:async_submit({?MODULE, test_mfa, []}), + exit(Pid, normal). + diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 29a6edc61..f79b84557 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -25,7 +25,7 @@ all() -> [t_session_all]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), Config. - + end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). @@ -40,7 +40,7 @@ t_session_all(_) -> [{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}), emqx_session:publish(SPid, 1, Message1), timer:sleep(200), - {publish, 1, _} = emqx_mock_client:get_last_message(), + {publish, 1, _} = emqx_mock_client:get_last_message(ConnPid), emqx_session:puback(SPid, 2), emqx_session:puback(SPid, 3, reasoncode), emqx_session:pubrec(SPid, 4), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl new file mode 100644 index 000000000..8eb309001 --- /dev/null +++ b/test/emqx_shared_sub_SUITE.erl @@ -0,0 +1,198 @@ + +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_shared_sub_SUITE). + +-export([all/0, init_per_suite/1, end_per_suite/1]). +-export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]). + +-include("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). + +all() -> [t_random_basic, t_random, t_round_robin, t_sticky, t_hash, t_not_so_sticky]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +t_random_basic(_) -> + application:set_env(?APPLICATION, shared_subscription_strategy, random), + ClientId = <<"ClientId">>, + {ok, ConnPid} = emqx_mock_client:start_link(ClientId), + {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), + Message1 = emqx_message:make(<<"ClientId">>, 2, <<"foo">>, <<"hello">>), + emqx_session:subscribe(SPid, [{<<"foo">>, #{qos => 2, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000), + emqx_session:publish(SPid, 1, Message1), + ?wait(case emqx_mock_client:get_last_message(ConnPid) of + {publish, 1, _} -> true; + Other -> Other + end, 1000), + emqx_session:puback(SPid, 2), + emqx_session:puback(SPid, 3, reasoncode), + emqx_session:pubrec(SPid, 4), + emqx_session:pubrec(SPid, 5, reasoncode), + emqx_session:pubrel(SPid, 6, reasoncode), + emqx_session:pubcomp(SPid, 7, reasoncode), + emqx_mock_client:close_session(ConnPid, SPid), + ok. + +t_random(_) -> + test_two_messages(random). + +t_round_robin(_) -> + test_two_messages(round_robin). + +t_sticky(_) -> + test_two_messages(sticky). + +t_hash(_) -> + test_two_messages(hash). + +%% if the original subscriber dies, change to another one alive +t_not_so_sticky(_) -> + application:set_env(?APPLICATION, shared_subscription_strategy, sticky), + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1), + {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2), + {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal), + {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal), + Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), + emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}], 1000), + emqx_session:publish(SPid1, 1, Message1), + ?wait(case emqx_mock_client:get_last_message(ConnPid1) of + {publish, _, #message{payload = <<"hello1">>}} -> true; + Other -> Other + end, 1000), + emqx_mock_client:close_session(ConnPid1, SPid1), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [], 1000), + emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid2, 2, Message2), + ?wait(case emqx_mock_client:get_last_message(ConnPid2) of + {publish, _, #message{payload = <<"hello2">>}} -> true; + Other -> Other + end, 1000), + emqx_mock_client:close_session(ConnPid2, SPid2), + ?wait(ets:tab2list(emqx_alive_shared_subscribers) =:= [], 1000), + ok. + +test_two_messages(Strategy) -> + application:set_env(?APPLICATION, shared_subscription_strategy, Strategy), + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1), + {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2), + {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal), + {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal), + Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), + emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + emqx_session:subscribe(SPid2, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso + ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000), + emqx_session:publish(SPid1, 1, Message1), + Me = self(), + WaitF = fun(ExpectedPayload) -> + case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of + {true, Pid} -> + Me ! {subscriber, Pid}, + true; + Other -> + Other + end + end, + ?wait(WaitF(<<"hello1">>), 2000), + UsedSubPid1 = receive {subscriber, P1} -> P1 end, + %% publish both messages with SPid1 + emqx_session:publish(SPid1, 2, Message2), + ?wait(WaitF(<<"hello2">>), 2000), + UsedSubPid2 = receive {subscriber, P2} -> P2 end, + case Strategy of + sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2); + round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2); + hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); + _ -> ok + end, + emqx_mock_client:close_session(ConnPid1, SPid1), + emqx_mock_client:close_session(ConnPid2, SPid2), + ok. + +last_message(_ExpectedPayload, []) -> <<"not yet?">>; +last_message(ExpectedPayload, [Pid | Pids]) -> + case emqx_mock_client:get_last_message(Pid) of + {publish, _, #message{payload = ExpectedPayload}} -> {true, Pid}; + _Other -> last_message(ExpectedPayload, Pids) + end. + +%%------------------------------------------------------------------------------ +%% help functions +%%------------------------------------------------------------------------------ + +wait_for(Fn, Ln, F, Timeout) -> + {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), + wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). + +wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) -> + receive + {'DOWN', Mref, process, Pid, normal} -> + ok; + {'DOWN', Mref, process, Pid, {C, E, S}} -> + erlang:raise(C, {Fn, Ln, E}, S) + after + Timeout -> + case Kill of + true -> + erlang:demonitor(Mref, [flush]), + erlang:exit(Pid, kill), + erlang:error({Fn, Ln, timeout}); + false -> + Pid ! stop, + wait_for_down(Fn, Ln, Timeout, Pid, Mref, true) + end + end. + +wait_loop(_F, true) -> exit(normal); +wait_loop(F, LastRes) -> + Res = catch_call(F), + receive + stop -> erlang:exit(LastRes) + after + 100 -> wait_loop(F, Res) + end. + +catch_call(F) -> + try + case F() of + true -> true; + Other -> erlang:error({unexpected, Other}) + end + catch + C : E : S -> + {C, E, S} + end. + diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 5e23100a9..6f9b92399 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -25,7 +25,7 @@ t_open_close_session(_) -> emqx_ct_broker_helpers:run_setup_steps(), {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid, - zone => internal, username => <<"zhou">>, conn_props => #{}}, + zone => internal, username => <<"zhou">>, expiry_interval => 0}, {ok, SPid} = emqx_sm:open_session(Attrs), [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>), SPid = emqx_sm:lookup_session_pid(<<"client">>),