Compare commits

...

165 Commits

Author SHA1 Message Date
turtled 0e97dfff29 Add baidian business 2020-01-06 18:46:29 +08:00
JianBo He d409e25e76 Fix dequeue error 2020-01-06 16:29:18 +08:00
turtled c1f8d7de2f Fix typo 2019-11-28 17:27:49 +08:00
zhouzb 76e6ded825 Fix typo 2019-11-23 10:51:31 +08:00
tigercl d730b24494
Avoid emqx crashes due to auth plugins (#3048)
Avoid emqx crashes due to auth plugins
2019-11-23 10:12:01 +08:00
tigercl 978153d993
Merge pull request #3049 from emqx/message_order
Ordered messaging via multiple gen_rpc clients
2019-11-23 10:11:19 +08:00
zhouzb b47a4f4422 Ordered messaging via multiple gen_rpc clients 2019-11-22 13:42:44 +08:00
zhouzb 7124a40f3d Optimize code 2019-11-21 16:26:04 +08:00
turtled 614b836983 Update pem certs 2019-11-15 15:43:48 +08:00
terry-xiaoyu 85d81a84d0 Update pem certs 2019-11-15 12:03:39 +08:00
zhouzb 243e6b3571 Return ok rather than ignore 2019-11-14 23:54:16 +08:00
tigercl ad5b954f1d
Merge pull request #3011 from emqx/max_subscriptions
Fix max_subscritpions
2019-11-01 21:04:43 +08:00
zhouzb 21293140d3 Fix max_subscritpions 2019-11-01 16:27:03 +08:00
terry-xiaoyu 1150e06d18 Add testcase for emqx_logger_formatter 2019-10-21 18:32:14 +08:00
turtleDeng b8cf9e14de
Merge pull request #2957 from emqx/upgrade_fail
Defend the ssl upgrade failure
2019-10-09 09:35:22 +08:00
terry-xiaoyu dbea5df7dc Defend the ssl upgrade failure 2019-10-08 17:40:35 +08:00
turtleDeng ac93725f74 Add emqx.app.src.script file (#2923) 2019-09-21 11:04:13 +08:00
turtleDeng 1574d50387
Merge pull request #2922 from emqx/fix_max_subs
Fix max_subscription not working
2019-09-21 11:01:36 +08:00
JianBo He 0a226ca4bc Fix max_subscription not working 2019-09-21 10:12:35 +08:00
turtleDeng 9c2584607a Optimize GC (#2917) 2019-09-20 16:55:12 +08:00
Michael Schmidt 2d99a1412e Add support for logger per module filtering (#2873)
Add mfa to the meta data to suppor the Erlang Logger's per module / per app filtering
2019-09-06 10:09:10 +08:00
turtleDeng 6f4b8d637b
Merge pull request #2870 from emqx/fix_bug
Fix credentials is null bug
2019-09-04 16:19:49 +08:00
turtled 0943129287 Fix credentials is null bug 2019-09-04 16:19:03 +08:00
turtleDeng 0908974017 Fix reload zone fail bug (#2857) 2019-08-31 15:03:40 +08:00
turtleDeng 19a8f0cbf8 Check max clients (#2859) 2019-08-31 14:27:13 +08:00
tigercl b4bbfad415
Fix will retain checking (#2820)
Fix will retain checking and handle the retained flag correctly
2019-08-22 16:12:14 +08:00
turtleDeng 68f6a43492
Merge pull request #2808 from emqx/invalid_cpu_alarm
Don't check cpu util on Alpine that uses libc-musl
2019-08-19 17:05:33 +08:00
zhouzb ac1e10bb60 Don't check cpu util on Alpine that uses libc-musl 2019-08-19 17:04:22 +08:00
Shawn 31671f5ee5 Fix Message-Expiry-Interval not work (#2791) 2019-08-16 17:50:37 +08:00
turtleDeng ffef64a803
Merge pull request #2751 from emqx/reliable_tracer
Make emqx_tracer more reliable
2019-08-16 17:46:27 +08:00
turtleDeng 58ba22dfc7
Merge pull request #2758 from emqx/merge_3.3
Fix unmount crash
2019-08-12 09:37:15 +08:00
turtled 0e82170ed5 Fix Typo 2019-08-06 17:26:27 +08:00
turtled 2c73ab9713 Add qos case 2019-08-06 14:59:59 +08:00
turtled 997b693400 Fix unmount crash 2019-08-06 14:16:36 +08:00
terry-xiaoyu 1a5d8ca3fd Make emqx_tracer more reliable
Remove the emqx_tracer process as the trace-handlers stored in state would be lost in case of crash.
2019-08-04 01:01:37 +08:00
turtleDeng be2ce93a2c
Change log level (#2739)
* Change log level
2019-07-31 21:56:28 +08:00
turtled 90ace6a331 Update ekka tag 2019-07-31 17:56:10 +08:00
turtled 4aff37772f Fix conflicts 2019-07-31 16:43:26 +08:00
Gilbert 0daa703193
Update gen_rpc tag (#2716) 2019-07-23 14:47:45 +08:00
Shawn 155eb82283
Merge pull request #2710 from emqx/master
merge from master to release-3.2
2019-07-20 19:15:44 +08:00
Shawn 3410151c24
Merge pull request #2709 from emqx/develop 2019-07-20 19:08:57 +08:00
terry-xiaoyu 43cf0fbab4 Fix heartbeat interval 2019-07-20 16:23:01 +08:00
terry-xiaoyu fa7292560a Fix heartbeat interval 2019-07-20 16:21:44 +08:00
turtleDeng bffca305c1
Merge pull request #2701 from emqx/master
Auto-pull-request-by-2019-07-20
2019-07-20 14:41:44 +08:00
aruldd a6210f7142 updated the location of built emqx 2019-07-20 14:32:25 +08:00
turtleDeng 2fc3e560d2
Merge pull request #2705 from emqx/develop
Fix session termiated without ws_channel
2019-07-20 14:25:01 +08:00
zhanghongtong 6ec7cb5090 Merge remote-tracking branch 'origin/develop' 2019-07-20 01:01:52 +08:00
turtleDeng 4d14d51dcb
Merge pull request #2691 from emqx/master
Fix the websocket normal exit bug
2019-07-16 01:42:26 +08:00
turtleDeng def758152d
Merge pull request #2690 from emqx/develop
Fix websoskct close BUG
2019-07-16 01:41:40 +08:00
turtleDeng 1c17d514aa
Merge pull request #2687 from emqx/master
Auto-pull-request-by-2019-07-12
2019-07-11 18:08:31 +08:00
zhanghongtong b8ce42fd29 Merge remote-tracking branch 'origin/develop' 2019-07-11 17:46:35 +08:00
zhanghongtong 47d628f9d9 Merge remote-tracking branch 'origin/develop' 2019-07-11 10:17:13 +08:00
turtleDeng c9498d7986
Merge pull request #2673 from emqx/master
Auto-pull-request-by-2019-07-06
2019-07-06 14:47:01 +08:00
zhanghongtong 19fdf3b77a Merge remote-tracking branch 'origin/develop' 2019-07-06 11:29:48 +08:00
turtleDeng 1a2592b13c
Merge pull request #2662 from emqx/master
Auto-pull-request-by-2019-06-30
2019-06-29 18:53:42 +08:00
zhanghongtong 86feae6adc Merge remote-tracking branch 'origin/develop' 2019-06-29 18:38:45 +08:00
zhanghongtong 5835c06745 Merge remote-tracking branch 'origin/develop' 2019-06-26 04:22:32 +08:00
zhanghongtong 7c3e5d765c Merge remote-tracking branch 'origin/develop' 2019-06-25 04:17:20 +08:00
turtleDeng 872c0af3fd
Merge pull request #2649 from emqx/master
Auto-pull-request-by-2019-06-22
2019-06-22 09:51:46 +08:00
zhanghongtong 411f8a0ec7 Merge remote-tracking branch 'origin/develop' 2019-06-22 09:24:41 +08:00
zhanghongtong e76451000c Merge remote-tracking branch 'origin/develop' 2019-06-20 17:30:36 +08:00
zhanghongtong 65ae10a651 Merge remote-tracking branch 'origin/develop' 2019-06-20 10:49:01 +08:00
zhanghongtong b9715b9e71 Merge remote-tracking branch 'origin/develop' 2019-06-19 04:16:00 +08:00
Shawn 0625bfe3f8
Merge pull request #2625 from emqx/master
Auto-pull-request-by-2019-06-15
2019-06-14 20:01:59 +08:00
zhanghongtong 6b43acc1c1 Merge remote-tracking branch 'origin/develop' 2019-06-14 18:24:56 +08:00
zhanghongtong 854a48d77c Merge remote-tracking branch 'origin/develop' 2019-06-13 14:35:14 +08:00
turtleDeng ea6d9f12d9
Merge pull request #2603 from emqx/master
Auto-pull-request-by-2019-06-06
2019-06-06 16:07:11 +08:00
zhanghongtong 5025b2f65d Merge remote-tracking branch 'origin/develop' 2019-06-06 13:47:12 +08:00
zhanghongtong 97f9e7123d Merge remote-tracking branch 'origin/develop' 2019-06-04 11:17:04 +08:00
zhanghongtong f0a434739d Merge remote-tracking branch 'origin/develop' 2019-06-02 04:06:51 +08:00
zhanghongtong 076776d7d4 Merge remote-tracking branch 'origin/develop' 2019-05-31 04:06:43 +08:00
zhanghongtong 44d901eb1e Merge remote-tracking branch 'origin/develop' 2019-05-29 11:03:25 +08:00
turtleDeng 297a385def
Merge pull request #2573 from emqx/develop
Merge the develop branch into the master
2019-05-27 10:18:49 +08:00
zhanghongtong bd664ae370 Merge remote-tracking branch 'origin/develop' 2019-05-09 10:50:29 +08:00
zhanghongtong 21c18d15d4 Merge remote-tracking branch 'origin/develop' 2019-04-26 03:55:02 +08:00
zhanghongtong af6ad8a90f Merge remote-tracking branch 'origin/develop' 2019-04-25 03:56:46 +08:00
zhanghongtong a2a8e42880 Merge remote-tracking branch 'origin/develop' 2019-04-20 03:54:47 +08:00
zhanghongtong 0f1b678126 Merge remote-tracking branch 'origin/develop' 2019-04-19 10:17:23 +08:00
zhanghongtong 1fbc50530a Merge remote-tracking branch 'origin/develop' 2019-04-18 03:53:43 +08:00
zhanghongtong 0c104faef7 Merge remote-tracking branch 'origin/develop' 2019-04-13 14:48:50 +08:00
zhanghongtong 8662f55c4c Merge remote-tracking branch 'origin/develop' 2019-04-12 03:53:43 +08:00
zhanghongtong cc43da0fd5 Merge remote-tracking branch 'origin/develop' 2019-04-11 12:50:31 +08:00
zhanghongtong e6ccbc601c Merge remote-tracking branch 'origin/develop' 2019-04-10 03:53:44 +08:00
zhanghongtong 22084025e6 Merge remote-tracking branch 'origin/develop' 2019-04-09 03:52:46 +08:00
zhanghongtong 2b1a2f5e13 Merge remote-tracking branch 'origin/develop' 2019-04-05 03:51:52 +08:00
zhanghongtong 2164c9149b Merge remote-tracking branch 'origin/develop' 2019-04-04 11:05:42 +08:00
zhanghongtong 21e31ab1c8 Merge remote-tracking branch 'origin/develop' 2019-04-03 04:24:31 +08:00
zhanghongtong 1aa30cba89 Merge remote-tracking branch 'origin/develop' 2019-04-02 09:15:04 +08:00
zhanghongtong 79e37cba0d Merge remote-tracking branch 'origin/develop' 2019-04-01 09:14:28 +08:00
zhanghongtong 127427b783 Merge remote-tracking branch 'origin/develop' 2019-03-30 09:19:45 +08:00
zhanghongtong ff0fd66725 Merge remote-tracking branch 'origin/develop' 2019-03-29 04:21:45 +08:00
zhanghongtong 5a4adefb16 Merge remote-tracking branch 'origin/develop' 2019-03-28 13:38:36 +08:00
zhanghongtong 24b4d83c12 Merge remote-tracking branch 'origin/develop' 2019-03-27 04:31:41 +08:00
zhanghongtong 3d45da8e03 Merge remote-tracking branch 'origin/develop' 2019-03-23 04:22:32 +08:00
zhanghongtong 962fb0cec5 Merge remote-tracking branch 'origin/develop' 2019-03-22 04:16:15 +08:00
zhanghongtong dac1b92d8f Merge remote-tracking branch 'origin/develop' 2019-03-21 04:14:44 +08:00
zhanghongtong 5fb4f23504 Merge remote-tracking branch 'origin/develop' 2019-03-19 09:51:37 +08:00
zhanghongtong 5f53952b45 Merge remote-tracking branch 'origin/develop' 2019-03-16 09:33:04 +08:00
zhanghongtong e349136cb2 Merge remote-tracking branch 'origin/develop' 2019-03-14 12:09:18 +08:00
zhanghongtong 28bd9a7fda Merge remote-tracking branch 'origin/develop' 2019-03-13 04:15:15 +08:00
zhanghongtong c4bf5aa34c Merge remote-tracking branch 'origin/develop' 2019-03-11 18:29:31 +08:00
zhanghongtong f837ac47e7 Merge remote-tracking branch 'origin/develop' 2019-02-28 11:30:54 +08:00
zhanghongtong b20e87f98e Merge remote-tracking branch 'origin/develop' 2019-02-25 09:33:02 +08:00
Gilbert Wong ee9f278738 Merge branch 'develop' 2019-02-21 16:09:43 +08:00
Feng Lee 8751c10ea5 Update README 2019-02-11 15:44:45 +08:00
Feng Lee 891ef2680e Merge branch 'emqx30' 2019-01-25 15:54:11 +08:00
Gilbert b461e26f25
Reload config (#2180)
Reload config when restart or reboot emqx
2019-01-24 23:55:57 +08:00
Feng Lee d5b17c516e Improve the 'try_open_session' function 2019-01-24 11:10:33 +08:00
Feng Lee bb9c41c9f0 Update rebar.config 2019-01-23 11:29:09 +08:00
Feng Lee 88dbbc3a44 Upgrade ekka, esockd libraries 2019-01-23 11:29:09 +08:00
spring2maz a6f138b55c Improve shared sub dispatch implementation. (#2144)
Before this change, when shared dispatch ack is enabled (in config)
in case all subscribers are offline (all sessions gave negative ack)
the message is simply discarded.
In this change, it is ensured to have one session picked according to
configured dispatch strategy when no subscriber is online.
The messages dispatched in such scenario are then queued in session state.
2019-01-22 09:57:37 +08:00
Gilbert 55ec358cd6 Fix bridge bug (#2160)
* Fix bridge bug

* Fix ack bug

* Limit bridge QoS less than 1
2019-01-22 09:42:32 +08:00
Gilbert 067d28dcb6 Change the reason code in will topic acl check (#2168)
This chang the reason code to not authorized code.
2019-01-21 09:49:29 +08:00
Gilbert 118e67a8ca Fix the rebar-xref error (#2108)
Prior to this change, @emqplus  merged emqx30 to master branch and it
cause some problems. For example, there are still some codes using
lager to print log and there is one local function which is not been
used by other function.

This change fix this issue.
2018-12-28 09:22:26 +08:00
Feng Lee ff9fccdb07 Merge branch 'emqx30' 2018-12-26 13:14:59 +08:00
turtleDeng 87f12256ca
Merge pull request #1684 from emqtt/emq24
Version 2.3.11
2018-07-22 15:41:20 +08:00
turtleDeng 9c1421f4b8
Merge pull request #1683 from emqtt/develop
Fix docker compile fail
2018-07-22 15:40:43 +08:00
turtled e58b0fc1db Fix docker compile fail 2018-07-22 15:39:52 +08:00
turtleDeng cd8dacabd6
Merge pull request #1682 from emqtt/emq24
Version 2.3.11
2018-07-21 22:47:36 +08:00
turtleDeng dc76d7ef93
Merge pull request #1681 from emqtt/develop
Version 2.3.11
2018-07-21 22:46:56 +08:00
turtled be76a6e50a Version 2.3.11 2018-07-21 22:46:06 +08:00
turtleDeng b55e76001a
Merge pull request #1680 from emqtt/emq24
Version 2.3.11
2018-07-21 22:45:08 +08:00
turtleDeng 0adc8b39af
Merge pull request #1664 from emqtt/develop
Support to start listener on specified port when emqttd is runnning
2018-07-21 22:43:53 +08:00
turtled d191ef0303 Version 2.3.11 2018-07-20 16:21:40 +08:00
Gilbert Wong 4aa50f0f6e Change listenon_tokens to parse_listenon 2018-07-06 17:16:49 +08:00
Gilbert Wong 9433e563fb support to start listeners when emqttd is running 2018-07-05 19:19:27 +08:00
Gilbert aa6ae3ad8a
Merge pull request #1639 from Gilbert-Wong/issue#1614
fix bug for issue#1614
2018-06-30 22:52:43 +08:00
Gilbert 8d6a78ce8e
Merge pull request #1658 from Gilbert-Wong/develop
fix bugs about rest api which getting config lists cause errors
2018-06-30 22:48:44 +08:00
Gilbert Wong cad0f1a858 fix bugs about rest api which getting config lists cause errors 2018-06-30 22:23:35 +08:00
huangdan b7abf7ca7a
Merge pull request #1650 from emqtt/emq24
Version 2.3.10
2018-06-26 11:05:59 +08:00
huangdan 3c6930b849
Merge pull request #1649 from emqtt/develop
Version 2.3.10
2018-06-26 10:05:07 +08:00
HuangDan ed1f428ef6 Version 2.3.10 2018-06-22 13:24:31 +08:00
Gilbert Wong d7e46b367e fix bug for issue#1614 2018-06-14 09:56:49 +08:00
HuangDan de319fa29a Merge branch 'emq24' 2018-05-19 18:35:54 +08:00
HuangDan d998728847 Merge branch 'develop' into emq24 2018-05-19 18:35:20 +08:00
HuangDan f9dde2f049 Version 2.3.9 2018-05-19 18:34:39 +08:00
HuangDan 609968dd31 Check params for REST publish API https://github.com/emqtt/emqttd/issues/1599 2018-05-18 15:56:29 +08:00
HuangDan 0bcc692071 Merge branch 'emq24' 2018-05-11 13:34:13 +08:00
HuangDan 1e5241f401 Merge branch 'develop' into emq24 2018-05-11 12:58:41 +08:00
HuangDan 83cfcc5d2f Version 2.3.8 2018-05-11 12:57:55 +08:00
huangdan 010e1fa9a8
Merge pull request #1588 from terry-xiaoyu/issue_emqttd_ctl
fix emqttd_ctl crashed when emq_auth_usename doesn't exist
2018-05-11 10:30:22 +08:00
huangdan b9378710d8
Merge pull request #1591 from terry-xiaoyu/issue-1590
#1590 error log when change CleanSession
2018-05-11 10:28:43 +08:00
terry-xiaoyu 69665be6f8 #1590 error log when change CleanSession 2018-05-07 20:35:12 +08:00
terry-xiaoyu 849f0aaef7 fix emqttd_ctl crashed when emq_auth_usename doesn't exist 2018-05-04 17:28:16 +08:00
huangdan 1da9e4397e
Merge pull request #1574 from emqtt/emq24
Version 2.3.7
2018-04-21 23:19:53 +08:00
huangdan f5b77f2e7b
Merge pull request #1558 from emqtt/develop
Version 2.3.7
2018-04-21 15:55:26 +08:00
HeeeJianBo e7e8131f2a Align code 2018-04-21 11:39:19 +08:00
turtled f717b734c3 Update ekka 0.2.3 2018-04-21 11:17:27 +08:00
turtled 5b0c752181 2.3.7 2018-04-21 10:44:29 +08:00
turtled 92f3036f1f Client sub/unsub mount topic 2018-04-21 10:43:43 +08:00
turtleDeng 58e3555e9b
Merge pull request #1564 from terry-xiaoyu/develop
fix #1562 dup flag not set when re-deliver
2018-04-21 10:41:34 +08:00
turtleDeng e79c714ec4
Merge pull request #1572 from callbay/devel
Devel
2018-04-21 10:00:21 +08:00
turtleDeng 2ee18ddebc
Merge pull request #1567 from terry-xiaoyu/clean_dead_session
clean dead persistent session on connect
2018-04-21 09:58:40 +08:00
Frank Feng 490ac8f449 match {error,Reason} 2018-04-20 14:33:13 +08:00
Frank Feng 27fcb73483 fix spec of function setstats/3 2018-04-20 14:07:41 +08:00
terry-xiaoyu 5039b751f4 clean dead persistent session on connect 2018-04-17 17:08:32 +08:00
turtled f13654dbce Support set k8s namespace 2018-04-17 11:39:03 +08:00
terry-xiaoyu 18100cacf9 fix #1562 dup flag not set when re-deliver
The dup flag is not set when redeliver the PUBLISH messages for QoS1 and QoS2
2018-04-13 16:00:46 +08:00
Feng Lee ad5ece8c33 Upgrade the lager_console_backend config 2018-04-09 14:56:33 +08:00
J Phani Mahesh 1f842e4a19 change log level: warning -> info on stop publish
Message publishes may be prevented by hooks by returning `{stop, Msg}`
or variants. However, this causes a log message at warning level.

Since a hook can potentially prevent multiple messages rapidly, these
warning messages pollute the logs. Similar level of detail is present
currently at info level, so demoting this message to info.
2018-03-27 21:20:26 +03:00
huangdan 8822468528
Merge pull request #1541 from emqtt/emq24
Version 2.3.6
2018-03-24 00:51:43 +08:00
huangdan 3c0294eafe
Merge pull request #1540 from emqtt/develop
Version 2.3.6
2018-03-23 20:57:30 +08:00
HuangDan e722e0f6d1 Version 2.3.6 2018-03-23 20:56:50 +08:00
huangdan ebbba23276
Merge pull request #1539 from emqtt/develop
Add LWT message ACL
2018-03-23 20:54:49 +08:00
turtled 61567c0fcc Add LWT message ACL 2018-03-12 20:16:20 +08:00
33 changed files with 550 additions and 393 deletions

View File

@ -28,7 +28,7 @@ git clone https://github.com/emqx/emqx-rel.git
cd emqx-rel && make cd emqx-rel && make
cd _rel/emqx && ./bin/emqx console cd _build/emqx/rel/emqx && ./bin/emqx console
``` ```

View File

@ -1,17 +1,18 @@
-----BEGIN CERTIFICATE----- -----BEGIN CERTIFICATE-----
MIICxjCCAa6gAwIBAgIJAJk1DbZBu8FDMA0GCSqGSIb3DQEBCwUAMBMxETAPBgNV MIIC0TCCAbmgAwIBAgIUDQN8HojZmyEV9+AzEz6j6juwThswDQYJKoZIhvcNAQEL
BAMMCE15VGVzdENBMB4XDTE3MTEwMjEzNDI0N1oXDTE5MTEwMjEzNDI0N1owEzER BQAwEzERMA8GA1UEAwwITXlUZXN0Q0EwHhcNMTkxMTE1MDcyNjU4WhcNMjkxMTEy
MA8GA1UEAwwITXlUZXN0Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB MDcyNjU4WjATMREwDwYDVQQDDAhNeVRlc3RDQTCCASIwDQYJKoZIhvcNAQEBBQAD
AQDshDho6ef1JClDJ24peSsXdFnFO3xIB7+BSp1YPcOvmRECKUG0mLORw3hNm15m ggEPADCCAQoCggEBALce8QYBpl7fxEhwW0wtBQygXisMcPTKzckz3RhU21TeqK1Z
8eGOn1iLGE/xKlaZ74/xjyq8f7qIGZCmvZj59m+eiJCAmy8SiUJZtSVoOlOzepJd 6Fm03QyYvB239oYJLodVwzv5SNI75hZ43Vyp+SHt3M3DjcsU/8PflxFK4QR7TdhI
PoDgcBvDKA4ogZ3iJHMUNI3EdlD6nrKEJF2qe2JUrL0gv65uo2/N7XVNvE87Dk3J ddn6R59Gqt0MhAZ/df2dYt7cMaQV8/5plzxLvrv9X2fwo8BYAGp6g6wGAL8SJDT9
83KyCAmeu+x+moS1ILnjs2DuPEGSxZqzf7IQMbXuNWJYAOZg9t4Fg0YjTiAaWw3G jd9TGzBG/o3dLu3keEwcl0CMq3qUwxatBHMe2s7COKBrngD/CvRAL8tG3VTj7ep9
JKAoMY4tI3JCqlvwGR4lH7kfk3WsD4ofGlFhxU4nEG0xgnJl8BcoJWD1A2RjGe1f n29SSS8qMzHhJdBahTDrYS+SeW61iFK1yLXSxCWNoMB0/g7/AktWuAXHdHRX9xaf
qCijqPSe93l2wt8OpbyHzwc7AgMBAAGjHTAbMAwGA1UdEwQFMAMBAf8wCwYDVR0P WNJ4RdoPxhqkVJ8SrC4JtC8ah6DchVysWnz2KwMCAwEAAaMdMBswDAYDVR0TBAUw
BAQDAgEGMA0GCSqGSIb3DQEBCwUAA4IBAQAi+t5jBrMxFzoF76kyRd3riNDlWp0w AwEB/zALBgNVHQ8EBAMCAQYwDQYJKoZIhvcNAQELBQADggEBAEgnPnHLdivykReJ
NCewkohBkwBHsQfHzSnc6c504jdyzkEiD42UcI8asPsJcsYrQ+Uo6OBn049u49Wn I8xf5DeWsgBUdVvhxz2E9Ole/u6ThulNLziwHernkTprskiKFJaF67ZzS7YddTdf
zcSERVSVec1/TAPS/egFTU9QMWtPSAm8AEaQ6YYAuiwOLCcC+Cm/a3e3dWSRWt8o WsS0H5LhYaft5NnBcn9UHCKEycyr3AJZ6joB3Dd9CfMQEscnZHNmIXwPGxw4bYP6
LqKX6CWTlmKWe182MhFPpZYxZQLGapti4R4mb5QusUbc6tXbkcX82GjDPTOuAw7b AElF0Iy7LY/Z8po/UACTBzCCSf5UkZ9Jy/rzxuvn/cfPcLNhDWk8b8MbmOfuyNPV
mWpzVd5xnlp7Vz+50u+YaAYUmCobg0hR/AuTrA4GDMlgzTnuZQhF6o8iVkypXOtS SfPGn7wXIt9iyyA4qyzEVMaXl8d94E48dV5Fc1sQEEo6gk16dQ9p64ePMvUih6an
Ufz6X3tVVErVVc7UUfzSnupHj1M2h4rzlQ3oqHoAEnXcJmV4f/Pf/6FW kSz9X/n1+9sHq54pJmLZ2gfRvGPIPVIipSjAj4sjHvKzuC3CQTTXs9HzmN2nT0zx
gLxgEkY=
-----END CERTIFICATE----- -----END CERTIFICATE-----

View File

@ -1,18 +1,18 @@
-----BEGIN CERTIFICATE----- -----BEGIN CERTIFICATE-----
MIIC6jCCAdKgAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
c3RDQTAeFw0xNzExMDIxMzQyNDhaFw0xOTExMDIxMzQyNDhaMC0xGjAYBgNVBAMT c3RDQTAeFw0xOTExMTUwNzI2NThaFw0yMTExMTQwNzI2NThaMCoxFzAVBgNVBAMM
EU1hY0Jvb2stQWlyLmxvY2FsMQ8wDQYDVQQKEwZzZXJ2ZXIwggEiMA0GCSqGSIb3 DjAwMDQubm92YWxvY2FsMQ8wDQYDVQQKDAZzZXJ2ZXIwggEiMA0GCSqGSIb3DQEB
DQEBAQUAA4IBDwAwggEKAoIBAQDUO/kL3ar3WsopPF12qAf+cwDHklGJIxJsjdoZ AQUAA4IBDwAwggEKAoIBAQDC5JE48PJ/BFTLEseEbrGIdYB6w29hme4KFKmAqlLQ
XgI1lPEe1W1QXwb/G/tyf6Fj2J8CD5bfsRjDxAemFIBVrFwlunCk+Gs6xR7vzz4O kpwwZJAsm/9iuXy6svJf7Tzzc173Jkgzw7DzhzSf1VgRDrOCQS+IU6s8UXfUMJt/
Fonoj4pmleruLQrNY/bHa2WN97OdISyXzhOgDwSaqobnF0n/f0Mx+9sdHO3p8LNB AmP1SkU2mUJ/+pnEGRKtVkF9LCScinI95Iwt3xngdjMYXwk+S9Le3/8782ClBwZG
3JXUyBpwDNr/TTfAb4pbQEu3LF4p7uyd1eLhKzUxSiWzKtjB1EYObA87fZu0tBJZ vffXQ7hd5HnShgyqFVePgrKmr879NTylfvAWPwux2kdXNnbOHIrhcZm0NeMNf7hs
iGujuFiI7tf4qWKeuAoRa/cXkgVZhk0utYauDoa7qBZ5O6ZdEko9ov0+i5+1JGU/ UNURFlqo4rA0FV9dIHMryPkM7ygoaMog2XmcCnq/jf/MfPTQPYjQ9iLPOGrYi0pY
w5wrSPNAnM2lYVUn0kJmcV2gwa4RZFjdqp+/Fx+HnKbnhZEnAgMBAAGjLzAtMAkG X12uFb55duRGsvs7MIkNc8fn2VERoC69QX+GK+zAUGZ/AgMBAAGjLzAtMAkGA1Ud
A1UdEwQCMAAwCwYDVR0PBAQDAgUgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqG EwQCMAAwCwYDVR0PBAQDAgUgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqGSIb3
SIb3DQEBCwUAA4IBAQByWhNxX/L5QYBiMY4JM1RRciV4uI3F2vsc0yMFDSrZza+5 DQEBCwUAA4IBAQBpW7Ge5duo6/u3xIl0XhG/2dlSwlUUpO3Ecc13gmh44nJR66VH
tNJQS86hjQsCRZh9VshezvT7k1yVsAC4pnu2pzob8H3KG4vYBafMdl2Ghgv3RMix BEiimsol6gIgcSTk4pVY1DLb/09Nwv0TILl3Dc4QtXhM4gIlNRR79mLVsnPTef5e
J3NrBhcoYYhXEoZHost+htxEi7P3QBo/qDkk48/d30+aDPbms6kQd8Fj8+C5tD3b xkmesQaLihSCroHq8bONnO/Xgj5hCg8uI4j3vHtOikjABxQPOrCfc2uSrenU7aol
aznO5Qlni72uTaM7fNA8exoc/YZc83lsqv7v+UzNQR595jnYSIAZcgil1qqygOan 1HBijCY6R+pg6WxBOZ2Teiaoxjn78IxSKLXW0pLRJIPpet1hefR0sKkmPfVGyg8H
Zx/RsMGUz6EYI9lPpoyyVtw13SoQshfgwvUlvBMiekSuI/pp6N7QPK6C8DLO0tVv g7hqo+Houw8PQf2HLZnU656vyTlgIh6ES1x7Plb0cIw/LGr4rMkXs+DFg9SLbetT
gXJjDgioqHc3hcgG4cskLbfVnohiwdhQTFayrLEk ncT4plfucsek7ImN9Dw2w2hM2FZwB8ycZfmu
-----END CERTIFICATE----- -----END CERTIFICATE-----

View File

@ -1,18 +1,18 @@
-----BEGIN CERTIFICATE----- -----BEGIN CERTIFICATE-----
MIIC6jCCAdKgAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl MIIC5zCCAc+gAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
c3RDQTAeFw0xNzExMDIxMzQyNDhaFw0xOTExMDIxMzQyNDhaMC0xGjAYBgNVBAMT c3RDQTAeFw0xOTExMTUwNzI2NThaFw0yMTExMTQwNzI2NThaMCoxFzAVBgNVBAMM
EU1hY0Jvb2stQWlyLmxvY2FsMQ8wDQYDVQQKEwZjbGllbnQwggEiMA0GCSqGSIb3 DjAwMDQubm92YWxvY2FsMQ8wDQYDVQQKDAZjbGllbnQwggEiMA0GCSqGSIb3DQEB
DQEBAQUAA4IBDwAwggEKAoIBAQC8GptpL25hv1Qa3jCn4VLvDRH/SrHg9wXvqRkz AQUAA4IBDwAwggEKAoIBAQDcwo5SaoRpzkqy+Y9OADOL7U84h1VFfjb5Uu5raenO
HuiKMxYT30m4+kcaXv350CJrkV+8lR24wdN7DBVewpCUnyUBbzkLccy1LUzunZ3z elmHSaCZpVP2EsDUaWavtabHd9fa5Oq6lOyZPDZM6xttfi78EV4RRfEJ4XdvE54W
nm37j6cautD3rlC9gsC9d0uJ745FLx5t/6f1jMk9rWxn+4iSGAnkWC3mVaQxP1zQ MZSDAGz4RwxfGOQWBSFyp1NrzT32eqeDSyBrE3jhWx9UUUMwthg5YYjCdBwK+Dwf
q8GI97uob9HNb0OH6ygHJAcKOWB+85a29LIMa1uo/lT3hMr8sBg2vX+1F/gTusmW hsfS1YeAfXPNO/BGSTe0dPhjLztXe1BkFO5VAwkSXaPs2lBJddOgpTTLXQ3+hIPL
xVoQc9XJxBCs995qsH0UkZIuOY0XZp9/qFfcZv2QmslG8DojIIHKcujzu8bItE2M ozkiaTOMOvIMXsCspdhJbSc+jAAGZT5X9Tx7htYbPXIwyDJgeYGmLtr9XxPJ8XGR
OyL5NlWLvN6qg59hHzF4+D+T+8GkhhKWSC+xdY14eQ5fB4S5AgMBAAGjLzAtMAkG rpxkB3zASRcwQzsxTcwkG4E32T53tKsljTkNt15rIoo3AgMBAAGjLzAtMAkGA1Ud
A1UdEwQCMAAwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMCMA0GCSqG EwQCMAAwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMCMA0GCSqGSIb3
SIb3DQEBCwUAA4IBAQBLV4ZfhiKiFVnL/xO0MRGSKr3xd0LK64SW8Iw5DYkc0jNX DQEBCwUAA4IBAQBRtQMvUmiB84RmrGwHCP8hcGUWTz03mtTjGrykNA7YQkA09cRl
sDrRbj2I/KJ/Rc4AeKT751L+C+KBzYpFgiLrxDmt/5pmgiFH51hPQtL7kRC0z2NY RwiqYMWh6zHjdX1Ri3m9eIi/QSK/JX3S9zjZU9dSTtsdnMhkRL08kcxauv9gVXCG
EY/P+u4IFVSo+b1hHYU7y+OMj6/Vvd4x0ETS4rHWI4mPDfGfvClEVLOktgRKrMU5 G1Vf+lUVJxTqwuAmcLiDNg9/89sSlxQXFS7Jn9TwTvNiRoFoN5IiJ4LsXyr4uS9Y
9aTltF4U0FBUlYZTQBNBUFwBzj1+0lxK4EdhRmmWJ+uW9rgkQxpnUdbCPGvUKFRp S4Ul1aqetwpTV8bjpIbRJbOR8qBFshIZOPdgAT3RqbD/vpGzOvvV0c9g3VFLYoK3
3AbdHBAU9H2zVd2VZoJu6r7LMp6agxu0rYLgmamRAt+8rnDXvy7H1ZNdjT6fTbUO nQ63w1zhwYxC4MQD9rN7JRAKCDQBLNzf8PW0RSG9pVsf1IjaLxtsmQMgrAati/Ux
omVBMyJAc1+10gjpHw/EUD58t5/I5tZrnrANPgIs AG76LAn9sodtb4GtV8E9ITG0pMNlJyUovstS
-----END CERTIFICATE----- -----END CERTIFICATE-----

View File

@ -1,27 +1,27 @@
-----BEGIN RSA PRIVATE KEY----- -----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAvBqbaS9uYb9UGt4wp+FS7w0R/0qx4PcF76kZMx7oijMWE99J MIIEowIBAAKCAQEA3MKOUmqEac5KsvmPTgAzi+1POIdVRX42+VLua2npznpZh0mg
uPpHGl79+dAia5FfvJUduMHTewwVXsKQlJ8lAW85C3HMtS1M7p2d855t+4+nGrrQ maVT9hLA1Glmr7Wmx3fX2uTqupTsmTw2TOsbbX4u/BFeEUXxCeF3bxOeFjGUgwBs
965QvYLAvXdLie+ORS8ebf+n9YzJPa1sZ/uIkhgJ5Fgt5lWkMT9c0KvBiPe7qG/R +EcMXxjkFgUhcqdTa8099nqng0sgaxN44VsfVFFDMLYYOWGIwnQcCvg8H4bH0tWH
zW9Dh+soByQHCjlgfvOWtvSyDGtbqP5U94TK/LAYNr1/tRf4E7rJlsVaEHPVycQQ gH1zzTvwRkk3tHT4Yy87V3tQZBTuVQMJEl2j7NpQSXXToKU0y10N/oSDy6M5Imkz
rPfearB9FJGSLjmNF2aff6hX3Gb9kJrJRvA6IyCBynLo87vGyLRNjDsi+TZVi7ze jDryDF7ArKXYSW0nPowABmU+V/U8e4bWGz1yMMgyYHmBpi7a/V8TyfFxka6cZAd8
qoOfYR8xePg/k/vBpIYSlkgvsXWNeHkOXweEuQIDAQABAoIBAHnFV7peRDzvGUlT wEkXMEM7MU3MJBuBN9k+d7SrJY05DbdeayKKNwIDAQABAoIBAC6ww3Mw7iKGrAvg
cXgcvA2ZDn+QIVsbTzJ466FWbv+YVsCCmj0veHwv5oakIMQ2Fh4FAnqqr3dGuUbg dmuz5TMSFPBKx0E0aaIf5Sc4tmeiPu87Jkl4yyI/YyNJy5scG1MSyMeWJQMjXksm
+avc4p3tHKa2Aul+7ADE9I3TkCt8MZdyPPk6VXZ5gMCmy7X96MIM4Mwg5uBlRZmx jgGEtD9bMcrETZXvqgRB+IW4q3XcNKHkZCe6tyYh2JPDsAhU1XL2bMWFuYouSIP9
/S3Lffvlp/G0y/ICmwpulG1Z4y4A5Vc0Qf7fBO03Ekl31oReARnB6ex7RnDHH1mW EVLwd9bYfRJ/YO4577fY4Nl9GRI9hdOB0Y4dDvxHCprxXC/wH6NpvI5dktTPr2xl
RyLWNqyu9BhUbFpIyFPWDSkBcajNIbQ6qVJfGLm5Y2xVhwdqbyvY8M06uuMKz/IR pNqABKdG8XEzP0duIpQf5zXbfDAWRUEpB9MDBXqmKmdjdPnpNS7JtkmCtWogdA9F
SYfdIpiC4PpQZQzzXMn/6LTKWcCe0T+dBcWTZHC3C2abrC7+5fwFobs2xoUaCwz0 LcyFI3e86qB9HHaqq1hBsQEG/DYj0RxCcAQFqTfvpxmZOXDlfWdB7M8xnqkD5xT0
1CclogECgYEA6Jdv+2VSYIBLbS0VIe07JiZaQNd1QNg63MK/y7oqAKEzYvpWzJel s6K1TXECgYEA79Lx5FFxfkN/uZKQzV1slJ/GSyfJqKhRh38/8G1ncmSG5dh0QMht
owPdBU3GxZH6vUUF7sCABcjumEDazoqTtzHQBo0xYpJrjmAL0ANNGVvF09pJK2eq Tt7FbFhYwGZQY9iMq1g/ujlHAzdKbFHGRX0z30xP7kf0R/L0W2yHMq59Ys4nUhGY
yotxJJAS5/lQNSgWOxGVc6qu6ZpgeIXVLIx816yq04h10yVgZ2Lm3+ECgYEAzwj5 o1v2sGxgDDP9XPNm/MV8DCZcoLMxvvFrfWLMYcvWTJb8TBGQgqpcEF8CgYEA66Zu
/UVpN/ak6PwZ+Tq/8qOYjY2ABylRmP+T0Rkqmwh2B5Sp9oXjkDQwWseY0Wybhd8F d+l2W5LSTgwYeIAQuiIhhNLY9Ct94TWrum5QZMdeR+IUYn+dT817Qbmf4KiiihfJ
kO6BUCMUApnB3uU0baawVbDUSrt43SkkKV9m3pA35wA3pYw1a56QIEFr56npFYBS V8t3tYgBBamNpqMKpm7An+HnFgRoV3o8W0pjlKdaQ0EiwhTQsLJcmZ1JV/k9Dd4V
sn9yl/ZtNvnuwmrHWOq8HdwPJsWREyO61yknn9kCgYB1PdixpSo4AJOErePoHRfi Rl26M0DZRKTHIUWLt7nNYexydQpfWlfRX1/n9SkCgYAKphUzjCI79wdO2CEx3Tob
rBR0eObez+Aj5Xsea3G+rYMkkkHskUhp+omPodvfPS1h+If8CEbAI7+5OX/R+uJo B1UotSWRJZgpKg9Ov6zeOXR79DaFQeEIpX+ipfGa6XAcXtswKIT74dszW1skoCTr
xpAwrT1Gjb3vn5R0vyU+8havKmoVmgTqYg2fO4x8KBz5HoLONZfbHR9cG3gjaHrD pPmOqrbJ38wK/dC31oPSTkkm//xi+oEKj+TORKGnKQ/Q9sXV53bwmyt1vz8wOUwK
IPHRGXVmeXPDAiUtGBp+oQKBgQDDRIAkNPdMZUCczknhG1w3Cb20pKUAHCRt3YAZ jz6AASsMz494WTdPdf0MhQKBgQDGoBos6JPiy/aH4podt5Rhz7MBCdfkt2P7GAoP
U1cv6gcIl1rGvPko5VBGDsM/ouP8m6CwVYN5hdw1p7eG9z8/vFvMNn/EDJWuYkNN sjwBNiq53E3iWD54rXJfC98+teWLEFGdttrIIEL8StYixvqLHn8uRHNLk5t/YIDP
EkH/4J4ZLcdOSLOJ0X+2LH4Nfd/s+58D49i9IxtXItviWruyTZMnxooz01tFZgmv UfxtqEHkvlpVzMW6qhxzPqg7htF3huHX1djEqrx3p4xQ9xW1Xt9G0s4G6R9GPw8z
LY3F4QKBgDirafhlJqFK6sa8WesHpD5+lm3Opzi4Ua8fAGHy2oHN3WCEL74q691C nNsfQQKBgF1nvj5xhD7fiVzS7NrjtBslDxKGQCfs9f1Xl2eadGp6pgwG/hvw5oO7
fA0P2UrzYiF7dXf4fgK9eMMQsdWS4nKyCSqM6xE4EAhAHUTYzY3ApNjI3XFDIrKC gtoYJuPq+Zu92a+UDQVErXMHiXn3iza/3EOf2BP9zbq9mBGZtKmLExP0QGEmDygb
oQefIOLum2UyWFuEoUtrEfc5fxktiQohCwuAvwC59EwhmsNlECA8 Yo18YdfwWwqxvEf+jt2URv0w+KNWL/3j5rDmngNa1iNubX3p1AK6
-----END RSA PRIVATE KEY----- -----END RSA PRIVATE KEY-----

View File

@ -1,27 +1,27 @@
-----BEGIN RSA PRIVATE KEY----- -----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEA1Dv5C92q91rKKTxddqgH/nMAx5JRiSMSbI3aGV4CNZTxHtVt MIIEpQIBAAKCAQEAwuSROPDyfwRUyxLHhG6xiHWAesNvYZnuChSpgKpS0JKcMGSQ
UF8G/xv7cn+hY9ifAg+W37EYw8QHphSAVaxcJbpwpPhrOsUe788+DhaJ6I+KZpXq LJv/Yrl8urLyX+0883Ne9yZIM8Ow84c0n9VYEQ6zgkEviFOrPFF31DCbfwJj9UpF
7i0KzWP2x2tljfeznSEsl84ToA8EmqqG5xdJ/39DMfvbHRzt6fCzQdyV1MgacAza NplCf/qZxBkSrVZBfSwknIpyPeSMLd8Z4HYzGF8JPkvS3t//O/NgpQcGRr3310O4
/003wG+KW0BLtyxeKe7sndXi4Ss1MUolsyrYwdRGDmwPO32btLQSWYhro7hYiO7X XeR50oYMqhVXj4Kypq/O/TU8pX7wFj8LsdpHVzZ2zhyK4XGZtDXjDX+4bFDVERZa
+KlinrgKEWv3F5IFWYZNLrWGrg6Gu6gWeTumXRJKPaL9PouftSRlP8OcK0jzQJzN qOKwNBVfXSBzK8j5DO8oKGjKINl5nAp6v43/zHz00D2I0PYizzhq2ItKWF9drhW+
pWFVJ9JCZnFdoMGuEWRY3aqfvxcfh5ym54WRJwIDAQABAoIBABNq2UJIqZev6scT eXbkRrL7OzCJDXPH59lREaAuvUF/hivswFBmfwIDAQABAoIBAQCYa9gj11Vf/0wt
CsoMXY7eHrgjnuoZF1pvMAEaJMGaOuVDSZkM2KsGeF7lZnKoIwQhQQB+R3HBwaFk kh9WNJhGJ9d2q5hVleR0H9q9FPg1xSPAOTYEnXBrjrO89CzY1xq/L7DKzDbVvSuM
RsmP125sPFobkFP0LPxrzZWkYkGwwEzacoAQBuj7uFxOayAuBXTe0CGjbRA7z4QH GmcOxfTdSkkcCs0Y6o7WWsTDv8ws1frFIPPmkpBOtPhDRHS1+eq38akkgKZ+P1te
DgiejNqfXhp4nHdxaiL5Lq1b7SlmarGXup3kcVTWxIiah4MK0o4YGiyQC8Mr+a7w mMiNIwQtAE6jWPuvcTIVee9QwaCn+5ZYIwICORNFoLsl7sKdLOfccSO7v9L/Az5r
UGYqdKQQMLOtly/HTEcyd/DAruboV+5L+pYx/pcFFXJupK6yaxELLHKeHAKA9MmA AT4xrJwpKl5MjOGzOxFv6M1rTh/Y9e17U+2/QQDnW4U7C4/gkQ1urJddaeDDnz8t
cnMNVpCQ8VdOyR9qrfwtABqd8egKea2Z3P+dK6PlxUAQe2kYlXxS0N+i/eU6PKYM GLAnshCdF8eL3vAKO6sMJiEGuVe3b2oBYrRjp7FSB1uLWWlFRb7TGE07UXP0JZDn
B76UhQECgYEA9GvBNG6ffQqkX6bNLQUsU+nvKAQeFq02ua9LFKYw2sVO6RfUjrNz W1lmUbcBAoGBAO4/37Obk1pM6GQzS49AwLJtz5Z9DpxMSaVW6XHQlOq6RBNQsMR0
u2cwAUXSp+tnPMesKEVOOUfRMN/QiI/JNw62uSWSKJ/64103vX+F5AjQmE2f7Zgt MS5k5TZgX0HZXAu0dGaPNzD7868dwTZE7tn6a1QanfmrqQVbJxHWTJtPEE0QGpGI
o3X23cV544HM8E5xCvIe7DFLK6cUdRQngu/uWi63cB4hMVpB9MfZJscCgYEA3kne vg2D6iiYUE0mVEiKf7dNp6hpp9ioYIdsRQK0H/u3sU/JfEFr00XpMb+BAoGBANFp
2sE4b67JkjmHGKahBJM5/iAHBqSubQmufIlaiLkyrDYGN2D+mi0fAF+uQ9KmNOrv wMIcB7RbyShO8QR/kVpahlOOnNDP7e+9KdUFl8i300ecO2QNR+hlQ+565J8nsANj
TsZ1bZu9f+VvaH7xNJzcUriXYs+HoN9/CWnAR0ktSm8RN7BznVd41NuLnsoWUt41 Y2kLMls2DTzMefrEZPecb8onjGFmSkwf9uCs8vmorlYmYmNlJkLL06ZN3SrpmHBD
jglpNYMwy7JPRLQNgYHErG7puksNawFvSKQEYqECgYA4N/iueKtSdXotTg5vRntV GogkCt1qkrTgtszjSqZe94UcpT+mfatSK4lRlSX/AoGAXdE7Ns/Ji6KDVIm6dFOs
qb8KczgAe0LVHs6kJz2hdDScRJDtabU665cNE+RKH0kVn8+nS5mcbzpchX5PitL7 TdbeCsV+DmAgFAKQdKgNLA1jJzP8F7Aleb5zYCE9AYIlM9rAh25X7msYf1m5LrSg
SPUaTNv7YCCy3yQNACHpu2VPQruASLpmmKF5jQxmGdrrgv9ZRyt5pDToC3wXGdWk Vae9weWlVZ6aNSi6ztRTYEkXAzGXNL3jERFkEM5BuM+iGtqnBjiHD9NjK/bJ5Cnn
tk8aixhCP4ve8CWvibAWzQKBgExxxwwf6tKtn3CEDCu0EifKoeT9Cq2EMOAatkDp VvQ1L/sa0G9oBZ7/GCWG2IECgYEAqAOmENb2Y4FEyl9Txl0nXIvGvCFutaYt66wk
05K1bfG/Wn/tAWHwJnswbHOym6oTKV1D7tpU9uRm+NtM3JKlZzejd5xpllECy2Nn dPIQzoyWKh0yFVsGd3FP6HWXGg54jK9gIfZGx6F9S2tu7oBF1dggZNwIKFkugRcg
VNKvHb49WAR40CnKDSnWnrtq8CZreKtyHRZkGYHTvmL4MLTa9dH/Cq4gZWrpQWYP NzDrnNz2Ss5vH/oWkX8BZ6uPKA/VKzTbg6EPSohn/lFQuOAfk44cHyNVfdTxfNPn
0dpBAoGAD4inpSm7SMN3/rgYXEU1CMRKXREbEWhondiTXZ8x8ugnnYtfhcBvCMif dDwNYzcCgYEA3Wig1HRNvTOnSskFz8eTmpu89hg1atuAk+c2d1Z+9HKVjkc6B/91
JQ8tso63hCHvKPDViTbLDyV7OuGBEPTQAyacX0FJmr7g5ERlvfmL4yjmvW7Bcclh pabnbujERtH3HW9TQ9+VVfImgC3Jy+TjsS3d6nA7e9060N9z30mVEY5lq03mKAMl
yrgbJXl2pdzMt9GpogIYFW0YyOr6VPIrGf62kRNrv2E8wyXEFAI= tSKFk4fRRxsKPsBN/NS0BiU8LJTzsDwLwRm9T4BNos+I35a8tFCCmtw=
-----END RSA PRIVATE KEY----- -----END RSA PRIVATE KEY-----

View File

@ -181,6 +181,11 @@ node.name = emqx@127.0.0.1
## Value: String ## Value: String
node.cookie = emqxsecretcookie node.cookie = emqxsecretcookie
## Node Max Clients Size.
##
## Value: String
node.max_clients = 1024000
## Data dir for the node ## Data dir for the node
## ##
## Value: Folder ## Value: Folder
@ -942,7 +947,7 @@ listener.tcp.external.access.1 = allow all
## Enable the option for X.509 certificate based authentication. ## Enable the option for X.509 certificate based authentication.
## EMQX will use the common name of certificate as MQTT username. ## EMQX will use the common name of certificate as MQTT username.
## ##
## Value: cn | dn ## Value: cn | dn | crt
## listener.tcp.external.peer_cert_as_username = cn ## listener.tcp.external.peer_cert_as_username = cn
## The TCP backlog defines the maximum length that the queue of pending ## The TCP backlog defines the maximum length that the queue of pending
@ -1293,10 +1298,10 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## Value: on | off ## Value: on | off
## listener.ssl.external.honor_cipher_order = on ## listener.ssl.external.honor_cipher_order = on
## Use the CN, EN or CRT field from the client certificate as a username. ## Use the CN, DN or CRT field from the client certificate as a username.
## Notice that 'verify' should be set as 'verify_peer'. ## Notice that 'verify' should be set as 'verify_peer'.
## ##
## Value: cn | en | crt ## Value: cn | dn | crt
## listener.ssl.external.peer_cert_as_username = cn ## listener.ssl.external.peer_cert_as_username = cn
## TCP backlog for the SSL connection. ## TCP backlog for the SSL connection.

View File

@ -41,5 +41,5 @@
-define(LOG(Level, Format, Args), -define(LOG(Level, Format, Args),
begin begin
(logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end})) (logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}))
end). end).

View File

@ -334,6 +334,13 @@ end}.
hidden hidden
]}. ]}.
%% @see node.max_clients
{mapping, "node.max_clients", "emqx.max_clients", [
{default, 1024000},
{datatype, integer},
hidden
]}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% RPC %% RPC
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -4,7 +4,7 @@
{gproc, "0.8.0"}, % hex {gproc, "0.8.0"}, % hex
{replayq, "0.1.1"}, %hex {replayq, "0.1.1"}, %hex
{esockd, "5.5.0"}, %hex {esockd, "5.5.0"}, %hex
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.9"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}. ]}.

34
src/emqx.app.src.script Normal file
View File

@ -0,0 +1,34 @@
%%-*- mode: erlang -*-
%% .app.src.script
Config = case os:getenv("EMQX_DESC") of
false -> CONFIG; % env var not defined
[] -> CONFIG; % env var set to empty string
Desc ->
[begin
AppConf0 = lists:keystore(description, 1, AppConf, {description, Desc}),
{application, App, AppConf0}
end || Conf = {application, App, AppConf} <- CONFIG]
end,
RemoveLeadingV =
fun(Tag) ->
case re:run(Tag, "v\[0-9\]+\.\[0-9\]+\.*") of
nomatch ->
Tag;
{match, _} ->
%% if it is a version number prefixed by 'v' then remove the 'v'
"v" ++ Vsn = Tag,
Vsn
end
end,
case os:getenv("EMQX_DEPS_DEFAULT_VSN") of
false -> Config; % env var not defined
[] -> Config; % env var set to empty string
Tag ->
[begin
AppConf0 = lists:keystore(vsn, 1, AppConf, {vsn, RemoveLeadingV(Tag)}),
{application, App, AppConf0}
end || Conf = {application, App, AppConf} <- Config]
end.

View File

@ -256,7 +256,7 @@ aggre(Routes) ->
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async) -spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
-> emqx_types:deliver_result()). -> emqx_types:deliver_result()).
forward(Node, To, Delivery, async) -> forward(Node, To, Delivery, async) ->
case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
true -> ok; true -> ok;
{badrpc, Reason} -> {badrpc, Reason} ->
?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]), ?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
@ -264,7 +264,7 @@ forward(Node, To, Delivery, async) ->
end; end;
forward(Node, To, Delivery, sync) -> forward(Node, To, Delivery, sync) ->
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
{badrpc, Reason} -> {badrpc, Reason} ->
?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]), ?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
{error, badrpc}; {error, badrpc};

View File

@ -149,7 +149,14 @@ call(CPid, Req) ->
init({Transport, RawSocket, Options}) -> init({Transport, RawSocket, Options}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, Socket} = Transport:wait(RawSocket), case Transport:wait(RawSocket) of
{ok, Socket} ->
do_init(Transport, Socket, Options);
{error, Reason} ->
?LOG(warning, "connection failed to establish: ~p", [Reason])
end.
do_init(Transport, Socket, Options) ->
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
@ -352,11 +359,11 @@ handle(info, {timeout, Timer, emit_stats},
{keep_state, NState#state{gc_state = GcState1}, hibernate}; {keep_state, NState#state{gc_state = GcState1}, hibernate};
{shutdown, Reason} -> {shutdown, Reason} ->
?LOG(error, "Shutdown exceptionally due to ~p", [Reason]), ?LOG(error, "Shutdown exceptionally due to ~p", [Reason]),
shutdown(Reason, NState) self() ! {shutdown, Reason}
end; end;
handle(info, {shutdown, discard, {ClientId, ByPid}}, State) -> handle(info, {shutdown, discard, {ClientId, ByPid}}, State) ->
?LOG(error, "Discarded by ~s:~p", [ClientId, ByPid]), ?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]),
shutdown(discard, State); shutdown(discard, State);
handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) -> handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) ->

View File

@ -44,6 +44,8 @@
-export([lookup_conn_pid/1]). -export([lookup_conn_pid/1]).
-export([max_client_size/0]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([ init/1
, handle_call/3 , handle_call/3
@ -148,6 +150,9 @@ lookup_conn_pid(ClientId) when is_binary(ClientId) ->
notify(Msg) -> notify(Msg) ->
gen_server:cast(?CM, {notify, Msg}). gen_server:cast(?CM, {notify, Msg}).
max_client_size() ->
ets:info(?CONN_TAB, size).
%%----------------------------------------------------------------------------- %%-----------------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%----------------------------------------------------------------------------- %%-----------------------------------------------------------------------------

View File

@ -115,7 +115,7 @@ run_fold(HookPoint, Args, Acc) ->
do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) -> do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
case filter_passed(Filter, Args) andalso execute(Action, Args) of case filter_passed(Filter, Args) andalso safe_execute(Action, Args) of
%% stop the hook chain and return %% stop the hook chain and return
stop -> ok; stop -> ok;
%% continue the hook chain, in following cases: %% continue the hook chain, in following cases:
@ -128,7 +128,7 @@ do_run([], _Args) ->
do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) -> do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
Args1 = Args ++ [Acc], Args1 = Args ++ [Acc],
case filter_passed(Filter, Args1) andalso execute(Action, Args1) of case filter_passed(Filter, Args1) andalso safe_execute(Action, Args1) of
%% stop the hook chain %% stop the hook chain
stop -> Acc; stop -> Acc;
%% stop the hook chain with NewAcc %% stop the hook chain with NewAcc
@ -148,6 +148,15 @@ filter_passed(undefined, _Args) -> true;
filter_passed(Filter, Args) -> filter_passed(Filter, Args) ->
execute(Filter, Args). execute(Filter, Args).
safe_execute(Fun, Args) ->
try execute(Fun, Args) of
Result -> Result
catch
_:Reason:Stacktrace ->
?LOG(error, "Failed to execute ~p(~p): ~p", [Fun, Args, {Reason, Stacktrace}]),
ok
end.
%% @doc execute a function. %% @doc execute a function.
execute(Fun, Args) when is_function(Fun) -> execute(Fun, Args) when is_function(Fun) ->
erlang:apply(Fun, Args); erlang:apply(Fun, Args);

View File

@ -30,8 +30,7 @@ init([]) ->
child_spec(emqx_stats, worker), child_spec(emqx_stats, worker),
child_spec(emqx_metrics, worker), child_spec(emqx_metrics, worker),
child_spec(emqx_ctl, worker), child_spec(emqx_ctl, worker),
child_spec(emqx_zone, worker), child_spec(emqx_zone, worker)]}}.
child_spec(emqx_tracer, worker)]}}.
child_spec(M, worker) -> child_spec(M, worker) ->
#{id => M, #{id => M,

View File

@ -75,7 +75,7 @@ format(#{level:=Level,msg:=Msg0,meta:=Meta},Config0)
end, end,
Config#{chars_limit=>Size} Config#{chars_limit=>Size}
end, end,
string:trim(format_msg(Msg0,Meta,Config1)); format_msg(Msg0,Meta,Config1);
true -> true ->
"" ""
end, end,
@ -134,7 +134,7 @@ to_string(X,_) when is_list(X) ->
_ -> io_lib:format(?FormatP,[X]) _ -> io_lib:format(?FormatP,[X])
end; end;
to_string(X,_) -> to_string(X,_) ->
io_lib:format("~s",[X]). io_lib:format(?FormatP,[X]).
printable_list([]) -> printable_list([]) ->
false; false;
@ -196,7 +196,7 @@ do_format_msg({Format0,Args},Depth,Opts) ->
%% already been here - avoid failing cyclically %% already been here - avoid failing cyclically
erlang:raise(C,R,S); erlang:raise(C,R,S);
_ -> _ ->
format_msg({FormatError,[Format0,Args]},Depth,Opts) do_format_msg({FormatError,[Format0,Args]},Depth,Opts)
end end
end. end.

View File

@ -294,12 +294,12 @@ do_inc_recv(?PACKET(?DISCONNECT)) ->
do_inc_recv(?PACKET(?AUTH)) -> do_inc_recv(?PACKET(?AUTH)) ->
inc('packets.auth.received'); inc('packets.auth.received');
do_inc_recv(_Packet) -> do_inc_recv(_Packet) ->
ignore. ok.
%% @doc Inc packets sent. Will not count $SYS PUBLISH. %% @doc Inc packets sent. Will not count $SYS PUBLISH.
-spec(inc_sent(emqx_mqtt_types:packet()) -> ok | ignore). -spec(inc_sent(emqx_mqtt_types:packet()) -> ok).
inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) -> inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
ignore; ok;
inc_sent(Packet) -> inc_sent(Packet) ->
inc('packets.sent'), inc('packets.sent'),
do_inc_sent(Packet). do_inc_sent(Packet).
@ -341,7 +341,7 @@ do_inc_sent(?PACKET(?DISCONNECT)) ->
do_inc_sent(?PACKET(?AUTH)) -> do_inc_sent(?PACKET(?AUTH)) ->
inc('packets.auth.sent'); inc('packets.auth.sent');
do_inc_sent(_Packet) -> do_inc_sent(_Packet) ->
ignore. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
@ -356,7 +356,7 @@ init([]) ->
% Store reserved indices % Store reserved indices
lists:foreach(fun({Type, Name}) -> lists:foreach(fun({Type, Name}) ->
Idx = reserved_idx(Name), Idx = reserved_idx(Name),
Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)}, Metric = #metric{name = Name, type = Type, idx = Idx},
true = ets:insert(?TAB, Metric), true = ets:insert(?TAB, Metric),
ok = counters:put(CRef, Idx, 0) ok = counters:put(CRef, Idx, 0)
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS), end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS),

View File

@ -15,9 +15,7 @@
-module(emqx_mountpoint). -module(emqx_mountpoint).
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("types.hrl").
-logger_header("[Mountpoint]").
-export([ mount/2 -export([ mount/2
, unmount/2 , unmount/2
@ -32,30 +30,34 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
mount(undefined, Any) -> mount(undefined, Any) ->
Any; Any;
mount(MountPoint, Topic) when is_binary(Topic) ->
prefix(MountPoint, Topic);
mount(MountPoint, Msg = #message{topic = Topic}) -> mount(MountPoint, Msg = #message{topic = Topic}) ->
Msg#message{topic = <<MountPoint/binary, Topic/binary>>}; Msg#message{topic = prefix(MountPoint, Topic)};
mount(MountPoint, TopicFilters) when is_list(TopicFilters) -> mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
[{<<MountPoint/binary, Topic/binary>>, SubOpts} || {Topic, SubOpts} <- TopicFilters]. [{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].
unmount(undefined, Msg) -> unmount(undefined, Any) ->
Msg; Any;
unmount(MountPoint, Topic) when is_binary(Topic) ->
case string:prefix(Topic, MountPoint) of
nomatch -> Topic;
Topic1 -> Topic1
end;
unmount(MountPoint, Msg = #message{topic = Topic}) -> unmount(MountPoint, Msg = #message{topic = Topic}) ->
try split_binary(Topic, byte_size(MountPoint)) of case string:prefix(Topic, MountPoint) of
{MountPoint, Topic1} -> Msg#message{topic = Topic1} nomatch -> Msg;
catch Topic1 -> Msg#message{topic = Topic1}
_Error:Reason ->
?LOG(error, "Unmount error : ~p", [Reason]),
Msg
end. end.
-spec(replvar(maybe(mountpoint()), map()) -> maybe(mountpoint())).
replvar(undefined, _Vars) -> replvar(undefined, _Vars) ->
undefined; undefined;
replvar(MountPoint, #{client_id := ClientId, username := Username}) -> replvar(MountPoint, #{client_id := ClientId, username := Username}) ->
lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]). lists:foldl(fun feed_var/2, MountPoint,
[{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
feed_var({<<"%c">>, ClientId}, MountPoint) -> feed_var({<<"%c">>, ClientId}, MountPoint) ->
emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint); emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
@ -64,3 +66,5 @@ feed_var({<<"%u">>, undefined}, MountPoint) ->
feed_var({<<"%u">>, Username}, MountPoint) -> feed_var({<<"%u">>, Username}, MountPoint) ->
emqx_topic:feed_var(<<"%u">>, Username, MountPoint). emqx_topic:feed_var(<<"%u">>, Username, MountPoint).
prefix(MountPoint, Topic) ->
<<MountPoint/binary, Topic/binary>>.

View File

@ -126,31 +126,13 @@ default_caps() ->
?DEFAULT_CAPS. ?DEFAULT_CAPS.
get_caps(Zone, publish) -> get_caps(Zone, publish) ->
with_env(Zone, '$mqtt_pub_caps', filter_caps(?PUBCAP_KEYS, get_caps(Zone));
fun() ->
filter_caps(?PUBCAP_KEYS, get_caps(Zone))
end);
get_caps(Zone, subscribe) -> get_caps(Zone, subscribe) ->
with_env(Zone, '$mqtt_sub_caps', filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
fun() ->
filter_caps(?SUBCAP_KEYS, get_caps(Zone))
end).
get_caps(Zone) -> get_caps(Zone) ->
with_env(Zone, '$mqtt_caps', maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} || {Cap, Def} <- ?DEFAULT_CAPS]).
fun() ->
maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)}
|| {Cap, Def} <- ?DEFAULT_CAPS])
end).
filter_caps(Keys, Caps) -> filter_caps(Keys, Caps) ->
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
with_env(Zone, Key, InitFun) ->
case emqx_zone:get_env(Zone, Key) of
undefined -> Caps = InitFun(),
ok = emqx_zone:set_env(Zone, Key, Caps),
Caps;
ZoneCaps -> ZoneCaps
end.

View File

@ -166,4 +166,7 @@ call(Req) ->
gen_server:call(?OS_MON, Req, infinity). gen_server:call(?OS_MON, Req, infinity).
ensure_check_timer(State = #{cpu_check_interval := Interval}) -> ensure_check_timer(State = #{cpu_check_interval := Interval}) ->
State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. case erlang:system_info(system_architecture) of
"x86_64-pc-linux-musl" -> State;
_ -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}
end.

View File

@ -162,10 +162,11 @@ will_msg(#mqtt_packet_connect{client_id = ClientId,
will_qos = QoS, will_qos = QoS,
will_topic = Topic, will_topic = Topic,
will_props = Properties, will_props = Properties,
will_payload = Payload}) -> will_payload = Payload,
proto_ver = ProtoVer}) ->
Msg = emqx_message:make(ClientId, QoS, Topic, Payload), Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
Msg#message{flags = #{dup => false, retain => Retain}, Msg#message{flags = #{dup => false, retain => Retain},
headers = merge_props(#{username => Username}, Properties)}. headers = merge_props(#{username => Username, proto_ver => ProtoVer}, Properties)}.
merge_props(Headers, undefined) -> merge_props(Headers, undefined) ->
Headers; Headers;

View File

@ -260,10 +260,22 @@ set_protover(_Packet, PState) ->
received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
{error, proto_not_connected, PState}; {error, proto_not_connected, PState};
received(Packet = ?PACKET(?CONNECT), PState = #pstate{connected = false}) ->
case check_max_clients() of
true ->
?LOG(error, "Connection rejected due to max clients limitation"),
connack({?RC_QUOTA_EXCEEDED, PState#pstate{credentials = credentials(PState)}});
false ->
do_received(Packet, PState)
end;
received(?PACKET(?CONNECT), PState = #pstate{connected = true}) -> received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
{error, proto_unexpected_connect, PState}; {error, proto_unexpected_connect, PState};
received(Packet = ?PACKET(Type), PState) -> received(Packet, PState) ->
do_received(Packet, PState).
do_received(Packet = ?PACKET(Type), PState) ->
trace(recv, Packet), trace(recv, Packet),
PState1 = set_protover(Packet, PState), PState1 = set_protover(Packet, PState),
try emqx_packet:validate(Packet) of try emqx_packet:validate(Packet) of
@ -562,10 +574,10 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Creden
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
PState = #pstate{session = SPid, credentials = Credentials}) -> PState = #pstate{session = SPid, credentials = Credentials, proto_ver = ProtoVer}) ->
Msg = emqx_mountpoint:mount(mountpoint(Credentials), Msg = emqx_mountpoint:mount(mountpoint(Credentials),
emqx_packet:to_message(Credentials, Packet)), emqx_packet:to_message(Credentials, Packet)),
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState). puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, emqx_message:set_header(proto_ver, ProtoVer, Msg))), PState).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Puback -> Client %% Puback -> Client
@ -834,8 +846,8 @@ check_will_retain(#mqtt_packet_connect{will_retain = false, proto_ver = ?MQTT_PR
ok; ok;
check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #pstate{zone = Zone}) -> check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #pstate{zone = Zone}) ->
case emqx_zone:get_env(Zone, mqtt_retain_available, true) of case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
true -> {error, ?RC_RETAIN_NOT_SUPPORTED}; true -> ok;
false -> ok false -> {error, ?RC_RETAIN_NOT_SUPPORTED}
end; end;
check_will_retain(_Packet, _PState) -> check_will_retain(_Packet, _PState) ->
ok. ok.
@ -1027,7 +1039,7 @@ raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridg
end. end.
mountpoint(Credentials) -> mountpoint(Credentials) ->
maps:get(mountpoint, Credentials, undefined). emqx_mountpoint:replvar(maps:get(mountpoint, Credentials, undefined), Credentials).
do_check_banned(_EnableBan = true, Credentials) -> do_check_banned(_EnableBan = true, Credentials) ->
case emqx_banned:check(Credentials) of case emqx_banned:check(Credentials) of
@ -1048,3 +1060,8 @@ do_acl_check(Action, Credentials, Topic, AllowTerm, DenyTerm) ->
allow -> AllowTerm; allow -> AllowTerm;
deny -> DenyTerm deny -> DenyTerm
end. end.
check_max_clients() ->
CurrentClientSize = emqx_cm:max_client_size(),
MaxClients = emqx_config:get_env(max_clients, 1024000),
CurrentClientSize >= MaxClients.

View File

@ -16,24 +16,41 @@
-module(emqx_rpc). -module(emqx_rpc).
-export([ call/4 -export([ call/4
, call/5
, cast/4 , cast/4
, cast/5
, multicall/4 , multicall/4
, multicall/5
]). ]).
-define(RPC, gen_rpc). -define(RPC, gen_rpc).
-define(DefaultClientNum, 1).
call(Node, Mod, Fun, Args) -> call(Node, Mod, Fun, Args) ->
filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)). filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).
call(Key, Node, Mod, Fun, Args) ->
filter_result(?RPC:call(rpc_node({Key, Node}), Mod, Fun, Args)).
multicall(Nodes, Mod, Fun, Args) -> multicall(Nodes, Mod, Fun, Args) ->
filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)). filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
multicall(Key, Nodes, Mod, Fun, Args) ->
filter_result(?RPC:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)).
cast(Node, Mod, Fun, Args) -> cast(Node, Mod, Fun, Args) ->
filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)). filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
rpc_node(Node) -> cast(Key, Node, Mod, Fun, Args) ->
{ok, ClientNum} = application:get_env(gen_rpc, tcp_client_num), filter_result(?RPC:cast(rpc_node({Key, Node}), Mod, Fun, Args)).
{Node, rand:uniform(ClientNum)}.
rpc_node(Node) when is_atom(Node) ->
ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
{Node, rand:uniform(ClientNum)};
rpc_node({Key, Node}) when is_atom(Node) ->
ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
{Node, erlang:phash2(Key, ClientNum) + 1}.
rpc_nodes(Nodes) -> rpc_nodes(Nodes) ->
rpc_nodes(Nodes, []). rpc_nodes(Nodes, []).
@ -43,7 +60,6 @@ rpc_nodes([], Acc) ->
rpc_nodes([Node | Nodes], Acc) -> rpc_nodes([Node | Nodes], Acc) ->
rpc_nodes(Nodes, [rpc_node(Node) | Acc]). rpc_nodes(Nodes, [rpc_node(Node) | Acc]).
filter_result({Error, Reason}) filter_result({Error, Reason})
when Error =:= badrpc; Error =:= badtcp -> when Error =:= badrpc; Error =:= badtcp ->
{badrpc, Reason}; {badrpc, Reason};

View File

@ -401,13 +401,22 @@ handle_call(stats, _From, State) ->
reply(stats(State), State); reply(stats(State), State);
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) -> handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
?LOG(warning, "Discarded by ~p", [ByPid]), ?LOG(notice, "Discarded by ~p", [ByPid]),
{stop, {shutdown, discarded}, ok, State}; {stop, {shutdown, discarded}, ok, State};
handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) -> handle_call({discard, ByPid}, _From, State = #state{conn_pid = ConnPid, client_id = ClientId}) ->
?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid]), ?LOG(notice, "Conn ~p is discarded by ~p", [ConnPid, ByPid]),
case ClientId of
<<"d:", _Sn/binary>> ->
ConnPid ! {shutdown, discard, {ClientId, ByPid}}, ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discarded}, ok, State}; {stop, {shutdown, discarded}, ok, State};
_ ->
Topic = <<"$SYS/kickout">>,
Msg = emqx_message:make(broker, 1, Topic, <<"The client has been kicked out">>),
{_, State1} = handle_dispatch([{Topic, Msg}], State),
erlang:send_after(5000, self(), {kicked, ByPid}),
{reply, ok, State1}
end;
%% PUBLISH: This is only to register packetId to session state. %% PUBLISH: This is only to register packetId to session state.
%% The actual message dispatching should be done by the caller (e.g. connection) process. %% The actual message dispatching should be done by the caller (e.g. connection) process.
@ -463,12 +472,17 @@ handle_call(Req, _From, State) ->
%% SUBSCRIBE: %% SUBSCRIBE:
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> State = #state{zone = Zone, client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
MaxSub = get_env(Zone, max_subscriptions, 0),
{ReasonCodes, Subscriptions1} = {ReasonCodes, Subscriptions1} =
lists:foldr( lists:foldr(
fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when ?IS_QOS(RC) ->
RC == ?QOS_0; RC == ?QOS_1; RC == ?QOS_2 -> case exceeded_subscription_quota(MaxSub, SubMap) of
{[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)}; true ->
{[?RC_QUOTA_EXCEEDED|RcAcc], SubMap};
false ->
{[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)}
end;
({_Topic, #{rc := RC}}, {RcAcc, SubMap}) -> ({_Topic, #{rc := RC}}, {RcAcc, SubMap}) ->
{[RC|RcAcc], SubMap} {[RC|RcAcc], SubMap}
end, {[], Subscriptions}, TopicFilters), end, {[], Subscriptions}, TopicFilters),
@ -493,16 +507,23 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1})); noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1}));
%% PUBACK: %% PUBACK:
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight,
noreply( client_id = ClientId,
conn_pid = ConnPid}) ->
case emqx_inflight:contain(PacketId, Inflight) of case emqx_inflight:contain(PacketId, Inflight) of
true -> true ->
ensure_stats_timer(dequeue(acked(puback, PacketId, State))); case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, {_, #message{topic = <<"$SYS/kickout">>}}, _Ts}} ->
ConnPid ! {shutdown, discard, {ClientId, undefined}},
{stop, {shutdown, discarded}, State};
_ ->
{noreply, ensure_stats_timer(dequeue(acked(puback, PacketId, State)))}
end;
false -> false ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.puback.missed'), ok = emqx_metrics:inc('packets.puback.missed'),
State {noreply, State}
end); end;
%% PUBCOMP: %% PUBCOMP:
handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
@ -603,7 +624,7 @@ handle_info({timeout, Timer, emit_stats},
{noreply, NewState#state{gc_state = GcState1}, hibernate}; {noreply, NewState#state{gc_state = GcState1}, hibernate};
{shutdown, Reason} -> {shutdown, Reason} ->
?LOG(warning, "Shutdown exceptionally due to ~p", [Reason]), ?LOG(warning, "Shutdown exceptionally due to ~p", [Reason]),
shutdown(Reason, NewState) self() ! {shutdown, Reason}
end; end;
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
@ -646,6 +667,13 @@ handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
[ConnPid, Pid, Reason]), [ConnPid, Pid, Reason]),
{noreply, State}; {noreply, State};
handle_info({shutdown, Reason}, State) ->
shutdown(Reason, State);
handle_info({kicked, ByPid}, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discarded}, State};
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]), ?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
@ -877,12 +905,14 @@ process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #sta
true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State); true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State);
false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State) false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State)
end; end;
process_subopts([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) -> process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags, headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) ->
process_subopts(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, State); process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags}, State = #state{}) -> process_subopts([{rap, _}|Opts], Msg = #message{headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) ->
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, State); process_subopts(Opts, Msg, Session);
process_subopts([{rap, _}|Opts], Msg, State) -> process_subopts([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session = #session{}) ->
process_subopts(Opts, Msg, State); process_subopts(Opts, Msg, Session);
process_subopts([{rap, _}|Opts], Msg = #message{flags = Flags}, Session) ->
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
process_subopts([{subid, SubId}|Opts], Msg, State) -> process_subopts([{subid, SubId}|Opts], Msg, State) ->
process_subopts(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State). process_subopts(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
@ -1020,9 +1050,19 @@ drain_q(Cnt, Msgs, Q) ->
case emqx_mqueue:out(Q) of case emqx_mqueue:out(Q) of
{empty, _Q} -> {Msgs, Q}; {empty, _Q} -> {Msgs, Q};
{{value, Msg}, Q1} -> {{value, Msg}, Q1} ->
drain_q(Cnt-1, [Msg|Msgs], Q1) case emqx_message:is_expired(Msg) of
true ->
ok = emqx_metrics:inc('messages.expired'),
drain_q(Cnt, Msgs, Q1);
false ->
drain_q(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
end
end. end.
-compile({inline, [acc_cnt/2]}).
acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt;
acc_cnt(_Msg, Cnt) -> Cnt - 1.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Ensure timers %% Ensure timers
@ -1130,3 +1170,9 @@ do_subscribe(ClientId, Username, Topic, SubOpts, SubMap) ->
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]), ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
maps:put(Topic, SubOpts, SubMap) maps:put(Topic, SubOpts, SubMap)
end. end.
exceeded_subscription_quota(0, _SubMap) ->
false;
exceeded_subscription_quota(Max, SubMap) ->
maps:size(SubMap) >= Max.

View File

@ -117,7 +117,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
catch catch
_:Error:_Stk -> _:Error:_Stk ->
unregister_session(ClientId, SessPid), unregister_session(ClientId, SessPid),
?LOG(warning, "Failed to discard ~p: ~p", [SessPid, Error]) ?LOG(notice, "Failed to discard ~p: ~p", [SessPid, Error])
end end
end, lookup_session_pids(ClientId)). end, lookup_session_pids(ClientId)).

View File

@ -14,8 +14,6 @@
-module(emqx_topic). -module(emqx_topic).
-include("emqx_mqtt.hrl").
%% APIs %% APIs
-export([ match/2 -export([ match/2
, validate/1 , validate/1
@ -33,19 +31,23 @@
, parse/2 , parse/2
]). ]).
-export_type([ group/0
, topic/0
, word/0
, triple/0
]).
-type(group() :: binary()). -type(group() :: binary()).
-type(topic() :: binary()). -type(topic() :: binary()).
-type(word() :: '' | '+' | '#' | binary()). -type(word() :: '' | '+' | '#' | binary()).
-type(words() :: list(word())). -type(words() :: list(word())).
-opaque(triple() :: {root | binary(), word(), binary()}). -opaque(triple() :: {root | binary(), word(), binary()}).
-export_type([group/0, topic/0, word/0, triple/0]).
-define(MAX_TOPIC_LEN, 4096). -define(MAX_TOPIC_LEN, 4096).
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% APIs %% APIs
%%------------------------------------------------------------------------------ %%--------------------------------------------------------------------
%% @doc Is wildcard topic? %% @doc Is wildcard topic?
-spec(wildcard(topic() | words()) -> true | false). -spec(wildcard(topic() | words()) -> true | false).
@ -60,7 +62,7 @@ wildcard(['+'|_]) ->
wildcard([_H|T]) -> wildcard([_H|T]) ->
wildcard(T). wildcard(T).
%% @doc Match Topic name with filter %% @doc Match Topic name with filter.
-spec(match(Name, Filter) -> boolean() when -spec(match(Name, Filter) -> boolean() when
Name :: topic() | words(), Name :: topic() | words(),
Filter :: topic() | words()). Filter :: topic() | words()).
@ -68,7 +70,7 @@ match(<<$$, _/binary>>, <<$+, _/binary>>) ->
false; false;
match(<<$$, _/binary>>, <<$#, _/binary>>) -> match(<<$$, _/binary>>, <<$#, _/binary>>) ->
false; false;
match(Name, Filter) when is_binary(Name) and is_binary(Filter) -> match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
match(words(Name), words(Filter)); match(words(Name), words(Filter));
match([], []) -> match([], []) ->
true; true;
@ -95,13 +97,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter ->
-spec(validate(name | filter, topic()) -> true). -spec(validate(name | filter, topic()) -> true).
validate(_, <<>>) -> validate(_, <<>>) ->
error(empty_topic); error(empty_topic);
validate(_, Topic) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) -> validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
error(topic_too_long); error(topic_too_long);
validate(filter, Topic) when is_binary(Topic) -> validate(filter, Topic) when is_binary(Topic) ->
validate2(words(Topic)); validate2(words(Topic));
validate(name, Topic) when is_binary(Topic) -> validate(name, Topic) when is_binary(Topic) ->
Words = words(Topic), Words = words(Topic),
validate2(Words) and (not wildcard(Words)). validate2(Words)
andalso (not wildcard(Words))
orelse error(topic_name_error).
validate2([]) -> validate2([]) ->
true; true;
@ -123,7 +127,7 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
validate3(<<_/utf8, Rest/binary>>) -> validate3(<<_/utf8, Rest/binary>>) ->
validate3(Rest). validate3(Rest).
%% @doc Topic to triples %% @doc Topic to triples.
-spec(triples(topic()) -> list(triple())). -spec(triples(topic()) -> list(triple())).
triples(Topic) when is_binary(Topic) -> triples(Topic) when is_binary(Topic) ->
triples(words(Topic), root, []). triples(words(Topic), root, []).
@ -206,27 +210,29 @@ join(Words) ->
end, {true, <<>>}, [bin(W) || W <- Words]), end, {true, <<>>}, [bin(W) || W <- Words]),
Bin. Bin.
-spec(parse(topic()) -> {topic(), #{}}). -spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}).
parse(Topic) when is_binary(Topic) -> parse(TopicFilter) when is_binary(TopicFilter) ->
parse(Topic, #{}). parse(TopicFilter, #{});
parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
parse(TopicFilter, Options).
parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) -> -spec(parse(topic(), map()) -> {topic(), map()}).
error({invalid_topic, Topic}); parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
parse(Topic = <<?SHARE, "/", _/binary>>, #{share := _Group}) -> error({invalid_topic_filter, TopicFilter});
error({invalid_topic, Topic}); parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
parse(<<"$queue/", Topic1/binary>>, Options) -> error({invalid_topic_filter, TopicFilter});
parse(Topic1, maps:put(share, <<"$queue">>, Options)); parse(<<"$queue/", TopicFilter/binary>>, Options) ->
parse(Topic = <<?SHARE, "/", Topic1/binary>>, Options) -> parse(TopicFilter, Options#{share => <<"$queue">>});
case binary:split(Topic1, <<"/">>) of parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
[<<>>] -> error({invalid_topic, Topic}); case binary:split(Rest, <<"/">>) of
[_] -> error({invalid_topic, Topic}); [_Any] -> error({invalid_topic_filter, TopicFilter});
[Group, Topic2] -> [ShareName, Filter] ->
case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of case binary:match(ShareName, [<<"+">>, <<"#">>]) of
nomatch -> {Topic2, maps:put(share, Group, Options)}; nomatch -> parse(Filter, Options#{share => ShareName});
_ -> error({invalid_topic, Topic}) _ -> error({invalid_topic_filter, TopicFilter})
end end
end; end;
parse(Topic, Options = #{qos := QoS}) -> parse(TopicFilter, Options = #{qos := QoS}) ->
{Topic, Options#{rc => QoS}}; {TopicFilter, Options#{rc => QoS}};
parse(Topic, Options) -> parse(TopicFilter, Options) ->
{Topic, Options}. {TopicFilter, Options}.

View File

@ -14,34 +14,19 @@
-module(emqx_tracer). -module(emqx_tracer).
-behaviour(gen_server).
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("logger.hrl").
-logger_header("[Tracer]"). -logger_header("[Tracer]").
%% APIs %% APIs
-export([start_link/0]).
-export([ trace/2 -export([ trace/2
, start_trace/3 , start_trace/3
, lookup_traces/0 , lookup_traces/0
, stop_trace/1 , stop_trace/1
]). ]).
%% gen_server callbacks -type(trace_who() :: {client_id | topic, binary() | list()}).
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {traces}).
-type(trace_who() :: {client_id | topic, binary()}).
-define(TRACER, ?MODULE). -define(TRACER, ?MODULE).
-define(FORMAT, {emqx_logger_formatter, -define(FORMAT, {emqx_logger_formatter,
@ -55,65 +40,62 @@
[peername," "], [peername," "],
[]}]}, []}]},
msg,"\n"]}}). msg,"\n"]}}).
-define(TOPIC_TRACE_ID(T), "trace_topic_"++T).
-define(CLIENT_TRACE_ID(C), "trace_clientid_"++C).
-define(TOPIC_TRACE(T), {topic,T}).
-define(CLIENT_TRACE(C), {client_id,C}).
-define(is_log_level(L),
L =:= emergency orelse
L =:= alert orelse
L =:= critical orelse
L =:= error orelse
L =:= warning orelse
L =:= notice orelse
L =:= info orelse
L =:= debug).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
start_link() ->
gen_server:start_link({local, ?TRACER}, ?MODULE, [], []).
trace(publish, #message{topic = <<"$SYS/", _/binary>>}) -> trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
%% Dont' trace '$SYS' publish %% Do not trace '$SYS' publish
ignore; ignore;
trace(publish, #message{from = From, topic = Topic, payload = Payload}) trace(publish, #message{from = From, topic = Topic, payload = Payload})
when is_binary(From); is_atom(From) -> when is_binary(From); is_atom(From) ->
emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]). emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~p", [Topic, Payload]).
%%------------------------------------------------------------------------------
%% Start/Stop trace
%%------------------------------------------------------------------------------
%% @doc Start to trace client_id or topic. %% @doc Start to trace client_id or topic.
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}). -spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
start_trace({client_id, ClientId}, Level, LogFile) -> start_trace(Who, all, LogFile) ->
do_start_trace({client_id, ClientId}, Level, LogFile); start_trace(Who, debug, LogFile);
start_trace({topic, Topic}, Level, LogFile) -> start_trace(Who, Level, LogFile) ->
do_start_trace({topic, Topic}, Level, LogFile). case ?is_log_level(Level) of
true ->
do_start_trace(Who, Level, LogFile) ->
#{level := PrimaryLevel} = logger:get_primary_config(), #{level := PrimaryLevel} = logger:get_primary_config(),
try logger:compare_levels(log_level(Level), PrimaryLevel) of try logger:compare_levels(Level, PrimaryLevel) of
lt -> lt ->
{error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])}; {error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
_GtOrEq -> _GtOrEq ->
gen_server:call(?MODULE, {start_trace, Who, Level, LogFile}, 5000) install_trace_handler(Who, Level, LogFile)
catch catch
_:Error -> _:Error ->
{error, Error} {error, Error}
end;
false -> {error, {invalid_log_level, Level}}
end. end.
%% @doc Stop tracing client_id or topic. %% @doc Stop tracing client_id or topic.
-spec(stop_trace(trace_who()) -> ok | {error, term()}). -spec(stop_trace(trace_who()) -> ok | {error, term()}).
stop_trace({client_id, ClientId}) -> stop_trace(Who) ->
gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}}); uninstall_trance_handler(Who).
stop_trace({topic, Topic}) ->
gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
%% @doc Lookup all traces %% @doc Lookup all traces
-spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]). -spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
lookup_traces() -> lookup_traces() ->
gen_server:call(?TRACER, lookup_traces). lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers()).
%%------------------------------------------------------------------------------ install_trace_handler(Who, Level, LogFile) ->
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([]) ->
{ok, #state{traces = #{}}}.
handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = Traces}) ->
case logger:add_handler(handler_id(Who), logger_disk_log_h, case logger:add_handler(handler_id(Who), logger_disk_log_h,
#{level => Level, #{level => Level,
formatter => ?FORMAT, formatter => ?FORMAT,
@ -121,54 +103,37 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T
config => #{type => halt, file => LogFile}, config => #{type => halt, file => LogFile},
filter_default => stop, filter_default => stop,
filters => [{meta_key_filter, filters => [{meta_key_filter,
{fun filter_by_meta_key/2, Who} }]}) of {fun filter_by_meta_key/2, Who}}]})
of
ok -> ok ->
?LOG(info, "Start trace for ~p", [Who]), ?LOG(info, "Start trace for ~p", [Who]);
{reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}};
{error, Reason} -> {error, Reason} ->
?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]), ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
{reply, {error, Reason}, State} {error, Reason}
end; end.
handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) -> uninstall_trance_handler(Who) ->
case maps:find(Who, Traces) of
{ok, _LogFile} ->
case logger:remove_handler(handler_id(Who)) of case logger:remove_handler(handler_id(Who)) of
ok -> ok ->
?LOG(info, "Stop trace for ~p", [Who]); ?LOG(info, "Stop trace for ~p", [Who]);
{error, Reason} -> {error, Reason} ->
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]) ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]),
end, {error, Reason}
{reply, ok, State#state{traces = maps:remove(Who, Traces)}}; end.
error ->
{reply, {error, not_found}, State}
end;
handle_call(lookup_traces, _From, State = #state{traces = Traces}) -> filter_traces({Id, Level, Dst}, Acc) ->
{reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State}; case atom_to_list(Id) of
?TOPIC_TRACE_ID(T)->
[{?TOPIC_TRACE(T), {Level,Dst}} | Acc];
?CLIENT_TRACE_ID(C) ->
[{?CLIENT_TRACE(C), {Level,Dst}} | Acc];
_ -> Acc
end.
handle_call(Req, _From, State) -> handler_id(?TOPIC_TRACE(Topic)) ->
?LOG(error, "Unexpected call: ~p", [Req]), list_to_atom(?TOPIC_TRACE_ID(str(Topic)));
{reply, ignored, State}. handler_id(?CLIENT_TRACE(ClientId)) ->
list_to_atom(?CLIENT_TRACE_ID(str(ClientId))).
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handler_id({topic, Topic}) ->
list_to_atom("topic_" ++ binary_to_list(Topic));
handler_id({client_id, ClientId}) ->
list_to_atom("clientid_" ++ binary_to_list(ClientId)).
filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) -> filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
case maps:find(MetaKey, Meta) of case maps:find(MetaKey, Meta) of
@ -181,13 +146,6 @@ filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
_ -> ignore _ -> ignore
end. end.
log_level(emergency) -> emergency; str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
log_level(alert) -> alert; str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
log_level(critical) -> critical; str(Str) when is_list(Str) -> Str.
log_level(error) -> error;
log_level(warning) -> warning;
log_level(notice) -> notice;
log_level(info) -> info;
log_level(debug) -> debug;
log_level(all) -> debug;
log_level(_) -> throw(invalid_log_level).

View File

@ -70,6 +70,7 @@ receive_messages(Count, Msgs) ->
basic_test(_Config) -> basic_test(_Config) ->
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
ct:print("Basic test starting"), ct:print("Basic test starting"),
init_caps(),
{ok, C} = emqx_client:start_link(), {ok, C} = emqx_client:start_link(),
{ok, _} = emqx_client:connect(C), {ok, _} = emqx_client:connect(C),
{ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1), {ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1),
@ -81,6 +82,7 @@ basic_test(_Config) ->
ok = emqx_client:disconnect(C). ok = emqx_client:disconnect(C).
will_message_test(_Config) -> will_message_test(_Config) ->
init_caps(),
{ok, C1} = emqx_client:start_link([{clean_start, true}, {ok, C1} = emqx_client:start_link([{clean_start, true},
{will_topic, nth(3, ?TOPICS)}, {will_topic, nth(3, ?TOPICS)},
{will_payload, <<"client disconnected">>}, {will_payload, <<"client disconnected">>},
@ -99,10 +101,10 @@ will_message_test(_Config) ->
ct:print("Will message test succeeded"). ct:print("Will message test succeeded").
offline_message_queueing_test(_) -> offline_message_queueing_test(_) ->
init_caps(),
{ok, C1} = emqx_client:start_link([{clean_start, false}, {ok, C1} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c1">>}]), {client_id, <<"c1">>}]),
{ok, _} = emqx_client:connect(C1), {ok, _} = emqx_client:connect(C1),
{ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2), {ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
ok = emqx_client:disconnect(C1), ok = emqx_client:disconnect(C1),
{ok, C2} = emqx_client:start_link([{clean_start, true}, {ok, C2} = emqx_client:start_link([{clean_start, true},
@ -123,6 +125,7 @@ offline_message_queueing_test(_) ->
?assertEqual(3, length(receive_messages(3))). ?assertEqual(3, length(receive_messages(3))).
overlapping_subscriptions_test(_) -> overlapping_subscriptions_test(_) ->
init_caps(),
{ok, C} = emqx_client:start_link([]), {ok, C} = emqx_client:start_link([]),
{ok, _} = emqx_client:connect(C), {ok, _} = emqx_client:connect(C),
@ -163,6 +166,7 @@ overlapping_subscriptions_test(_) ->
redelivery_on_reconnect_test(_) -> redelivery_on_reconnect_test(_) ->
ct:print("Redelivery on reconnect test starting"), ct:print("Redelivery on reconnect test starting"),
init_caps(),
{ok, C1} = emqx_client:start_link([{clean_start, false}, {ok, C1} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c">>}]), {client_id, <<"c">>}]),
{ok, _} = emqx_client:connect(C1), {ok, _} = emqx_client:connect(C1),
@ -194,6 +198,7 @@ redelivery_on_reconnect_test(_) ->
dollar_topics_test(_) -> dollar_topics_test(_) ->
ct:print("$ topics test starting"), ct:print("$ topics test starting"),
init_caps(),
{ok, C} = emqx_client:start_link([{clean_start, true}, {ok, C} = emqx_client:start_link([{clean_start, true},
{keepalive, 0}]), {keepalive, 0}]),
{ok, _} = emqx_client:connect(C), {ok, _} = emqx_client:connect(C),
@ -205,3 +210,13 @@ dollar_topics_test(_) ->
?assertEqual(0, length(receive_messages(1))), ?assertEqual(0, length(receive_messages(1))),
ok = emqx_client:disconnect(C), ok = emqx_client:disconnect(C),
ct:print("$ topics test succeeded"). ct:print("$ topics test succeeded").
init_caps() ->
Caps = #{max_qos_allowed => 2,
max_topic_levels => 0,
mqtt_shared_subscription => true,
mqtt_wildcard_subscription => true,
max_topic_alias => 0,
mqtt_retain_available => true},
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
timer:sleep(100).

View File

@ -27,38 +27,19 @@ all() -> [t_get_set_caps, t_check_pub, t_check_sub].
t_get_set_caps(_) -> t_get_set_caps(_) ->
{ok, _} = emqx_zone:start_link(), {ok, _} = emqx_zone:start_link(),
Caps = #{
max_packet_size => ?MAX_PACKET_SIZE, PubCaps = emqx_mqtt_caps:get_caps(external, publish),
max_clientid_len => ?MAX_CLIENTID_LEN,
max_topic_alias => 0,
max_topic_levels => 0,
max_qos_allowed => ?QOS_2,
mqtt_retain_available => true,
mqtt_shared_subscription => true,
mqtt_wildcard_subscription => true
},
Caps2 = Caps#{max_packet_size => 1048576},
case emqx_mqtt_caps:get_caps(zone) of
Caps -> ok;
Caps2 -> ok
end,
PubCaps = #{
max_qos_allowed => ?QOS_2,
mqtt_retain_available => true,
max_topic_alias => 0
},
PubCaps = emqx_mqtt_caps:get_caps(zone, publish),
NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1}, NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1},
emqx_zone:set_env(zone, '$mqtt_pub_caps', NewPubCaps), [emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(NewPubCaps)],
timer:sleep(100), timer:sleep(100),
NewPubCaps = emqx_mqtt_caps:get_caps(zone, publish), NewPubCaps = emqx_mqtt_caps:get_caps(external, publish),
SubCaps = #{
max_topic_levels => 0, SubCaps = emqx_mqtt_caps:get_caps(external, subscribe),
max_qos_allowed => ?QOS_2, NewSubCaps = SubCaps#{max_topic_levels => 2},
mqtt_shared_subscription => true, [emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(NewSubCaps)],
mqtt_wildcard_subscription => true timer:sleep(100),
}, NewSubCaps = emqx_mqtt_caps:get_caps(external, subscribe),
SubCaps = emqx_mqtt_caps:get_caps(zone, subscribe),
emqx_zone:stop(). emqx_zone:stop().
t_check_pub(_) -> t_check_pub(_) ->
@ -68,35 +49,34 @@ t_check_pub(_) ->
mqtt_retain_available => false, mqtt_retain_available => false,
max_topic_alias => 4 max_topic_alias => 4
}, },
emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), [emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(PubCaps)],
timer:sleep(100), timer:sleep(100),
ct:log("~p", [emqx_mqtt_caps:get_caps(zone, publish)]), ct:log("~p", [emqx_mqtt_caps:get_caps(external, publish)]),
BadPubProps1 = #{ BadPubProps1 = #{
qos => ?QOS_2, qos => ?QOS_2,
retain => false retain => false
}, },
{error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps1), {error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(external, BadPubProps1),
BadPubProps2 = #{ BadPubProps2 = #{
qos => ?QOS_1, qos => ?QOS_1,
retain => true retain => true
}, },
{error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps2), {error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(external, BadPubProps2),
BadPubProps3 = #{ BadPubProps3 = #{
qos => ?QOS_1, qos => ?QOS_1,
retain => false, retain => false,
topic_alias => 5 topic_alias => 5
}, },
{error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(zone, BadPubProps3), {error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(external, BadPubProps3),
PubProps = #{ PubProps = #{
qos => ?QOS_1, qos => ?QOS_1,
retain => false retain => false
}, },
ok = emqx_mqtt_caps:check_pub(zone, PubProps), ok = emqx_mqtt_caps:check_pub(external, PubProps),
emqx_zone:stop(). emqx_zone:stop().
t_check_sub(_) -> t_check_sub(_) ->
{ok, _} = emqx_zone:start_link(), {ok, _} = emqx_zone:start_link(),
Opts = #{qos => ?QOS_2, share => true, rc => 0}, Opts = #{qos => ?QOS_2, share => true, rc => 0},
Caps = #{ Caps = #{
max_topic_levels => 0, max_topic_levels => 0,
@ -104,6 +84,8 @@ t_check_sub(_) ->
mqtt_shared_subscription => true, mqtt_shared_subscription => true,
mqtt_wildcard_subscription => true mqtt_wildcard_subscription => true
}, },
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
timer:sleep(100),
ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]), ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]),
ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]), ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]),
@ -122,10 +104,10 @@ t_check_sub(_) ->
do_check_sub(TopicFilters, Topics) -> do_check_sub(TopicFilters, Topics) ->
{ok, Topics} = emqx_mqtt_caps:check_sub(zone, TopicFilters), {ok, Topics} = emqx_mqtt_caps:check_sub(external, TopicFilters),
ok. ok.
do_check_sub(Caps, TopicFilters, Topics) -> do_check_sub(Caps, TopicFilters, Topics) ->
emqx_zone:set_env(zone, '$mqtt_sub_caps', Caps), [emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
timer:sleep(100), timer:sleep(100),
{_, Topics} = emqx_mqtt_caps:check_sub(zone, TopicFilters), {_, Topics} = emqx_mqtt_caps:check_sub(external, TopicFilters),
ok. ok.

View File

@ -56,7 +56,7 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([], fun set_special_configs/1), emqx_ct_helpers:start_apps([], fun set_special_configs/1),
MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()), MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()),
emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}), [emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(MqttCaps#{max_topic_alias => 20})],
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->

View File

@ -21,7 +21,7 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
all() -> [ignore_loop, t_session_all]. all() -> [ignore_loop, t_session_all, t_message_expiry_interval_1, t_message_expiry_interval_2].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),
@ -66,3 +66,63 @@ t_session_all(_) ->
timer:sleep(200), timer:sleep(200),
[] = emqx:subscriptions(SPid), [] = emqx:subscriptions(SPid),
emqx_mock_client:close_session(ConnPid). emqx_mock_client:close_session(ConnPid).
t_message_expiry_interval_1(_) ->
ClientA = message_expiry_interval_init(),
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]].
t_message_expiry_interval_2(_) ->
ClientA = message_expiry_interval_init(),
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]].
message_expiry_interval_init() ->
{ok, ClientA} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, ClientB} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqx_client:connect(ClientA),
{ok, _} = emqx_client:connect(ClientB),
%% subscribe and disconnect client-b
emqx_client:subscribe(ClientB, <<"t/a">>, 1),
emqx_client:stop(ClientB),
ClientA.
message_expiry_interval_exipred(ClientA, QoS) ->
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
%% publish to t/a and waiting for the message expired
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
ct:sleep(2000),
%% resume the session for client-b
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqx_client:connect(ClientB1),
%% verify client-b could not receive the publish message
receive
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
ct:fail(should_have_expired)
after 300 ->
ok
end,
emqx_client:stop(ClientB1).
message_expiry_interval_not_exipred(ClientA, QoS) ->
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
%% publish to t/a
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
%% wait for 1s and then resume the session for client-b, the message should not expires
%% as Message-Expiry-Interval = 20s
ct:sleep(1000),
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqx_client:connect(ClientB1),
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
receive
{publish,#{client_pid := ClientB1, topic := <<"t/a">>,
properties := #{'Message-Expiry-Interval' := MsgExpItvl}}}
when MsgExpItvl < 20 -> ok;
{publish, _} = Msg ->
ct:fail({incorrect_publish, Msg})
after 300 ->
ct:fail(no_publish_received)
end,
emqx_client:stop(ClientB1).

View File

@ -43,7 +43,7 @@ start_traces(_Config) ->
emqx_logger:set_log_level(debug), emqx_logger:set_log_level(debug),
ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"), ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"), ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
{error, invalid_log_level} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"), {error, {invalid_log_level, bad_level}} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"), ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
ct:sleep(100), ct:sleep(100),
@ -53,9 +53,9 @@ start_traces(_Config) ->
?assert(filelib:is_regular("tmp/topic_trace.log")), ?assert(filelib:is_regular("tmp/topic_trace.log")),
%% Get current traces %% Get current traces
?assertEqual([{{client_id,<<"client">>},{debug,"tmp/client.log"}}, ?assertEqual([{{client_id,"client"},{debug,"tmp/client.log"}},
{{client_id,<<"client2">>},{all,"tmp/client2.log"}}, {{client_id,"client2"},{debug,"tmp/client2.log"}},
{{topic,<<"a/#">>},{all,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()), {{topic,"a/#"},{debug,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
%% set the overall log level to debug %% set the overall log level to debug
emqx_logger:set_log_level(debug), emqx_logger:set_log_level(debug),