Compare commits

...

50 Commits

Author SHA1 Message Date
JianBo He f4cdaf78ed Fix(connect): fix the race condition for openning session
- Remove the register_channel/1,2 functions
2020-06-02 17:50:54 +08:00
terry-xiaoyu 63b5b4f588 Add log overload protection parameters 2020-05-28 21:21:01 +08:00
turtleDeng 0f5bc86dff
Merge pull request #3496 from emqx/force_subscribe_4.0
Subscribe or unsubscribe via HTTP API skip ACL checking
2020-05-27 13:32:53 +08:00
zhouzb 1f964d5a7c Subscribe or unsubscribe via HTTP API skip ACL checking 2020-05-27 10:57:44 +08:00
turtleDeng 36f9cf0ac9
Merge pull request #3478 from emqx/fix_ts
Add timestamp for alarm
2020-05-22 17:37:03 +08:00
zhouzb 56dccd2ce5 Add timestamp for alarm 2020-05-22 17:27:52 +08:00
zhouzb 997b0018f5 Fix test case 2020-05-22 13:37:10 +08:00
zhouzb c536757e1f Fix unexpected packet before connect packet 2020-05-22 13:37:10 +08:00
zhouzb 4290cdf6f0 Add timestamp for alarm 2020-05-22 13:36:11 +08:00
JianBo He 077b5f6c7d Upgrade ekka to 0.7.3 2020-05-22 09:28:45 +08:00
turtleDeng d6ab5f0703
Merge pull request #3458 from emqx/issue#3455 2020-05-12 10:34:43 +08:00
turtleDeng f5ed95ba17
Merge pull request #3448 from emqx/update_certs_v4
Update certs v4
2020-05-12 10:34:09 +08:00
zhouzb 36b8765009 Fix case_clause in issue#3455 2020-05-11 11:13:39 +08:00
turtleDeng 71f01279c6
Fix test cases fail 2020-05-08 20:14:05 +08:00
zhouzb 60562a6afe Update certs 2020-05-08 18:20:31 +08:00
zhouzb 9cf61bfb2e Update certs 2020-05-08 16:17:33 +08:00
JianBo He 0ebd36b011 Get client's addr/port from proxy header if enable proxy_protocol 2020-04-22 17:34:36 +08:00
turtleDeng 909efa2020
Merge pull request #3407 from emqx/fix_flapping_4.0
Delete expired client
2020-04-22 17:30:18 +08:00
zhouzb 09197d3775 Delete expired client 2020-04-22 15:08:14 +08:00
turtleDeng 545ba905dc
Merge pull request #3328 from emqx/fix_json_encode_for40
Fix emqx_json encode return types error
2020-03-21 14:33:44 +08:00
JianBo He 5e02c569f2 Fix emqx_json encode return types error 2020-03-21 13:41:30 +08:00
turtleDeng 4ba3c343f0
Merge pull request #3322 from emqx/master
Auto-pull-request-by-2020-03-17
2020-03-17 18:11:50 +08:00
turtleDeng f184973ab2
Merge pull request #3319 from emqx/master
Change global_gc to major mode (#3317)
2020-03-17 14:30:13 +08:00
turtleDeng 531d1cf0a3
Merge pull request #3314 from emqx/master
Auto-pull-request-by-2020-03-16
2020-03-16 20:18:30 +08:00
JianBo He 3256f51444
Merge pull request #3295 from emqx/master
Auto-pull-request-by-2020-03-06
2020-03-06 11:04:59 +08:00
JianBo He 014e231378
Merge pull request #3261 from emqx/master 2020-02-21 15:39:46 +08:00
turtleDeng 43c29c5330
Merge pull request #3240 from emqx/master
Receive for the EXIT message on test cases
2020-02-07 17:08:59 +08:00
turtleDeng fe94acffec
Merge pull request #3238 from emqx/master
Auto-pull-request-by-2020-02-07
2020-02-07 16:11:37 +08:00
turtleDeng 7a1d22e79f
Merge pull request #3202 from emqx/master
Auto-pull-request-by-2020-01-17
2020-01-17 19:49:48 +08:00
zhouzb 227f8c74b1 Fix bad setting 2020-01-17 10:11:38 +08:00
zhouzb 3ee8d5c5ca Fix rap handling and keep the value of retain flag in bridge mode 2020-01-16 16:49:13 +08:00
turtled 66a7845bdc Hotfix #3187 2020-01-16 15:26:56 +08:00
turtleDeng 050409b653
Merge pull request #3171 from emqx/master
Auto-pull-request-by-2020-01-10
2020-01-10 19:04:41 +08:00
turtleDeng 221978e6c3
Merge pull request #3170 from emqx/master
Update mqtt_protocol_v5_SUITE.erl
2020-01-10 17:49:50 +08:00
turtleDeng ed296498fe
Merge pull request #3167 from emqx/master
Auto-pull-request-by-2020-01-10
2020-01-10 17:48:01 +08:00
turtleDeng c818b22bd4
Merge pull request #3147 from emqx/master
Auto-pull-request-by-2019-12-31
2019-12-31 13:20:31 +08:00
turtleDeng c57c037c53
Merge pull request #3130 from emqx/master
Auto-pull-request-by-2019-12-21
2019-12-21 18:21:12 +08:00
turtleDeng a3c228e5e3
Merge pull request #3113 from emqx/master
Auto-pull-request-by-2019-12-16
2019-12-16 22:59:01 +08:00
turtleDeng 0b6111a7dc
Merge pull request #3084 from emqx/master
Auto-pull-request-by-2019-12-07
2019-12-07 19:57:31 +08:00
tigercl 8f5a0a8cbd
Merge pull request #3043 from emqx/master
Auto-pull-request-by-2019-11-18
2019-11-18 17:27:23 +08:00
turtleDeng a7eabdb0b1
Merge pull request #3018 from emqx/master
Auto-pull-request-by-2019-11-02
2019-11-02 00:34:45 +08:00
Shawn fee9a7228b
Merge pull request #2975 from emqx/master
Auto-pull-request-by-2019-10-14
2019-10-14 21:54:57 +08:00
tigercl 4c7ab6a5d7
Merge pull request #2947 from emqx/master
Auto-pull-request-by-2019-09-30
2019-09-30 10:35:28 +08:00
turtleDeng 8f0e16e119
Merge pull request #2924 from emqx/master
Auto-pull-request-by-2019-09-21
2019-09-21 16:28:57 +08:00
turtleDeng a449cb565b
Merge pull request #2905 from emqx/master
Auto-pull-request-by-2019-09-17
2019-09-16 20:37:58 +08:00
turtleDeng ee0a76f89d
Merge pull request #2879 from emqx/master
Version 4.0-alpha.2
2019-09-06 20:44:12 +08:00
turtleDeng 893f1c1f3c
Merge pull request #2876 from emqx/master
Auto-pull-request-by-2019-09-07
2019-09-06 19:36:35 +08:00
turtleDeng e4a5121e86
Merge pull request #2839 from emqx/master
Auto-pull-request-by-2019-08-24
2019-08-23 20:06:23 +08:00
turtled 303c2414e5 Fix conflicts 2019-08-10 02:20:38 +08:00
Gilbert 3ebd06a0fa
Update gen_rpc (#2717) 2019-07-23 14:48:44 +08:00
18 changed files with 512 additions and 201 deletions

View File

@ -1,18 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIC0TCCAbmgAwIBAgIUDQN8HojZmyEV9+AzEz6j6juwThswDQYJKoZIhvcNAQEL
BQAwEzERMA8GA1UEAwwITXlUZXN0Q0EwHhcNMTkxMTE1MDcyNjU4WhcNMjkxMTEy
MDcyNjU4WjATMREwDwYDVQQDDAhNeVRlc3RDQTCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBALce8QYBpl7fxEhwW0wtBQygXisMcPTKzckz3RhU21TeqK1Z
6Fm03QyYvB239oYJLodVwzv5SNI75hZ43Vyp+SHt3M3DjcsU/8PflxFK4QR7TdhI
ddn6R59Gqt0MhAZ/df2dYt7cMaQV8/5plzxLvrv9X2fwo8BYAGp6g6wGAL8SJDT9
jd9TGzBG/o3dLu3keEwcl0CMq3qUwxatBHMe2s7COKBrngD/CvRAL8tG3VTj7ep9
n29SSS8qMzHhJdBahTDrYS+SeW61iFK1yLXSxCWNoMB0/g7/AktWuAXHdHRX9xaf
WNJ4RdoPxhqkVJ8SrC4JtC8ah6DchVysWnz2KwMCAwEAAaMdMBswDAYDVR0TBAUw
AwEB/zALBgNVHQ8EBAMCAQYwDQYJKoZIhvcNAQELBQADggEBAEgnPnHLdivykReJ
I8xf5DeWsgBUdVvhxz2E9Ole/u6ThulNLziwHernkTprskiKFJaF67ZzS7YddTdf
WsS0H5LhYaft5NnBcn9UHCKEycyr3AJZ6joB3Dd9CfMQEscnZHNmIXwPGxw4bYP6
AElF0Iy7LY/Z8po/UACTBzCCSf5UkZ9Jy/rzxuvn/cfPcLNhDWk8b8MbmOfuyNPV
SfPGn7wXIt9iyyA4qyzEVMaXl8d94E48dV5Fc1sQEEo6gk16dQ9p64ePMvUih6an
kSz9X/n1+9sHq54pJmLZ2gfRvGPIPVIipSjAj4sjHvKzuC3CQTTXs9HzmN2nT0zx
gLxgEkY=
MIIDUTCCAjmgAwIBAgIJAPPYCjTmxdt/MA0GCSqGSIb3DQEBCwUAMD8xCzAJBgNV
BAYTAkNOMREwDwYDVQQIDAhoYW5nemhvdTEMMAoGA1UECgwDRU1RMQ8wDQYDVQQD
DAZSb290Q0EwHhcNMjAwNTA4MDgwNjUyWhcNMzAwNTA2MDgwNjUyWjA/MQswCQYD
VQQGEwJDTjERMA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UE
AwwGUm9vdENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzcgVLex1
EZ9ON64EX8v+wcSjzOZpiEOsAOuSXOEN3wb8FKUxCdsGrsJYB7a5VM/Jot25Mod2
juS3OBMg6r85k2TWjdxUoUs+HiUB/pP/ARaaW6VntpAEokpij/przWMPgJnBF3Ur
MjtbLayH9hGmpQrI5c2vmHQ2reRZnSFbY+2b8SXZ+3lZZgz9+BaQYWdQWfaUWEHZ
uDaNiViVO0OT8DRjCuiDp3yYDj3iLWbTA/gDL6Tf5XuHuEwcOQUrd+h0hyIphO8D
tsrsHZ14j4AWYLk1CPA6pq1HIUvEl2rANx2lVUNv+nt64K/Mr3RnVQd9s8bK+TXQ
KGHd2Lv/PALYuwIDAQABo1AwTjAdBgNVHQ4EFgQUGBmW+iDzxctWAWxmhgdlE8Pj
EbQwHwYDVR0jBBgwFoAUGBmW+iDzxctWAWxmhgdlE8PjEbQwDAYDVR0TBAUwAwEB
/zANBgkqhkiG9w0BAQsFAAOCAQEAGbhRUjpIred4cFAFJ7bbYD9hKu/yzWPWkMRa
ErlCKHmuYsYk+5d16JQhJaFy6MGXfLgo3KV2itl0d+OWNH0U9ULXcglTxy6+njo5
CFqdUBPwN1jxhzo9yteDMKF4+AHIxbvCAJa17qcwUKR5MKNvv09C6pvQDJLzid7y
E2dkgSuggik3oa0427KvctFf8uhOV94RvEDyqvT5+pgNYZ2Yfga9pD/jjpoHEUlo
88IGU8/wJCx3Ds2yc8+oBg/ynxG8f/HmCC1ET6EHHoe2jlo8FpU/SgGtghS1YL30
IWxNsPrUP+XsZpBJy/mvOhE5QXo6Y35zDqqj8tI7AGmAWu22jg==
-----END CERTIFICATE-----

View File

@ -1,18 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
c3RDQTAeFw0xOTExMTUwNzI2NThaFw0yMTExMTQwNzI2NThaMCoxFzAVBgNVBAMM
DjAwMDQubm92YWxvY2FsMQ8wDQYDVQQKDAZzZXJ2ZXIwggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQDC5JE48PJ/BFTLEseEbrGIdYB6w29hme4KFKmAqlLQ
kpwwZJAsm/9iuXy6svJf7Tzzc173Jkgzw7DzhzSf1VgRDrOCQS+IU6s8UXfUMJt/
AmP1SkU2mUJ/+pnEGRKtVkF9LCScinI95Iwt3xngdjMYXwk+S9Le3/8782ClBwZG
vffXQ7hd5HnShgyqFVePgrKmr879NTylfvAWPwux2kdXNnbOHIrhcZm0NeMNf7hs
UNURFlqo4rA0FV9dIHMryPkM7ygoaMog2XmcCnq/jf/MfPTQPYjQ9iLPOGrYi0pY
X12uFb55duRGsvs7MIkNc8fn2VERoC69QX+GK+zAUGZ/AgMBAAGjLzAtMAkGA1Ud
EwQCMAAwCwYDVR0PBAQDAgUgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqGSIb3
DQEBCwUAA4IBAQBpW7Ge5duo6/u3xIl0XhG/2dlSwlUUpO3Ecc13gmh44nJR66VH
BEiimsol6gIgcSTk4pVY1DLb/09Nwv0TILl3Dc4QtXhM4gIlNRR79mLVsnPTef5e
xkmesQaLihSCroHq8bONnO/Xgj5hCg8uI4j3vHtOikjABxQPOrCfc2uSrenU7aol
1HBijCY6R+pg6WxBOZ2Teiaoxjn78IxSKLXW0pLRJIPpet1hefR0sKkmPfVGyg8H
g7hqo+Houw8PQf2HLZnU656vyTlgIh6ES1x7Plb0cIw/LGr4rMkXs+DFg9SLbetT
ncT4plfucsek7ImN9Dw2w2hM2FZwB8ycZfmu
MIIDEzCCAfugAwIBAgIBAjANBgkqhkiG9w0BAQsFADA/MQswCQYDVQQGEwJDTjER
MA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UEAwwGUm9vdENB
MB4XDTIwMDUwODA4MDcwNVoXDTMwMDUwNjA4MDcwNVowPzELMAkGA1UEBhMCQ04x
ETAPBgNVBAgMCGhhbmd6aG91MQwwCgYDVQQKDANFTVExDzANBgNVBAMMBlNlcnZl
cjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALNeWT3pE+QFfiRJzKmn
AMUrWo3K2j/Tm3+Xnl6WLz67/0rcYrJbbKvS3uyRP/stXyXEKw9CepyQ1ViBVFkW
Aoy8qQEOWFDsZc/5UzhXUnb6LXr3qTkFEjNmhj+7uzv/lbBxlUG1NlYzSeOB6/RT
8zH/lhOeKhLnWYPXdXKsa1FL6ij4X8DeDO1kY7fvAGmBn/THh1uTpDizM4YmeI+7
4dmayA5xXvARte5h4Vu5SIze7iC057N+vymToMk2Jgk+ZZFpyXrnq+yo6RaD3ANc
lrc4FbeUQZ5a5s5Sxgs9a0Y3WMG+7c5VnVXcbjBRz/aq2NtOnQQjikKKQA8GF080
BQkCAwEAAaMaMBgwCQYDVR0TBAIwADALBgNVHQ8EBAMCBeAwDQYJKoZIhvcNAQEL
BQADggEBAJefnMZpaRDHQSNUIEL3iwGXE9c6PmIsQVE2ustr+CakBp3TZ4l0enLt
iGMfEVFju69cO4oyokWv+hl5eCMkHBf14Kv51vj448jowYnF1zmzn7SEzm5Uzlsa
sqjtAprnLyof69WtLU1j5rYWBuFX86yOTwRAFNjm9fvhAcrEONBsQtqipBWkMROp
iUYMkRqbKcQMdwxov+lHBYKq9zbWRoqLROAn54SRqgQk6c15JdEfgOOjShbsOkIH
UhqcwRkQic7n1zwHVGVDgNIZVgmJ2IdIWBlPEC7oLrRrBD/X1iEEXtKab6p5o22n
KB5mN+iQaE+Oe2cpGKZJiJRdM+IqDDQ=
-----END CERTIFICATE-----

View File

@ -1,18 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIC5zCCAc+gAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
c3RDQTAeFw0xOTExMTUwNzI2NThaFw0yMTExMTQwNzI2NThaMCoxFzAVBgNVBAMM
DjAwMDQubm92YWxvY2FsMQ8wDQYDVQQKDAZjbGllbnQwggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQDcwo5SaoRpzkqy+Y9OADOL7U84h1VFfjb5Uu5raenO
elmHSaCZpVP2EsDUaWavtabHd9fa5Oq6lOyZPDZM6xttfi78EV4RRfEJ4XdvE54W
MZSDAGz4RwxfGOQWBSFyp1NrzT32eqeDSyBrE3jhWx9UUUMwthg5YYjCdBwK+Dwf
hsfS1YeAfXPNO/BGSTe0dPhjLztXe1BkFO5VAwkSXaPs2lBJddOgpTTLXQ3+hIPL
ozkiaTOMOvIMXsCspdhJbSc+jAAGZT5X9Tx7htYbPXIwyDJgeYGmLtr9XxPJ8XGR
rpxkB3zASRcwQzsxTcwkG4E32T53tKsljTkNt15rIoo3AgMBAAGjLzAtMAkGA1Ud
EwQCMAAwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMCMA0GCSqGSIb3
DQEBCwUAA4IBAQBRtQMvUmiB84RmrGwHCP8hcGUWTz03mtTjGrykNA7YQkA09cRl
RwiqYMWh6zHjdX1Ri3m9eIi/QSK/JX3S9zjZU9dSTtsdnMhkRL08kcxauv9gVXCG
G1Vf+lUVJxTqwuAmcLiDNg9/89sSlxQXFS7Jn9TwTvNiRoFoN5IiJ4LsXyr4uS9Y
S4Ul1aqetwpTV8bjpIbRJbOR8qBFshIZOPdgAT3RqbD/vpGzOvvV0c9g3VFLYoK3
nQ63w1zhwYxC4MQD9rN7JRAKCDQBLNzf8PW0RSG9pVsf1IjaLxtsmQMgrAati/Ux
AG76LAn9sodtb4GtV8E9ITG0pMNlJyUovstS
MIIDEzCCAfugAwIBAgIBATANBgkqhkiG9w0BAQsFADA/MQswCQYDVQQGEwJDTjER
MA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UEAwwGUm9vdENB
MB4XDTIwMDUwODA4MDY1N1oXDTMwMDUwNjA4MDY1N1owPzELMAkGA1UEBhMCQ04x
ETAPBgNVBAgMCGhhbmd6aG91MQwwCgYDVQQKDANFTVExDzANBgNVBAMMBkNsaWVu
dDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMy4hoksKcZBDbY680u6
TS25U51nuB1FBcGMlF9B/t057wPOlxF/OcmbxY5MwepS41JDGPgulE1V7fpsXkiW
1LUimYV/tsqBfymIe0mlY7oORahKji7zKQ2UBIVFhdlvQxunlIDnw6F9popUgyHt
dMhtlgZK8oqRwHxO5dbfoukYd6J/r+etS5q26sgVkf3C6dt0Td7B25H9qW+f7oLV
PbcHYCa+i73u9670nrpXsC+Qc7Mygwa2Kq/jwU+ftyLQnOeW07DuzOwsziC/fQZa
nbxR+8U9FNftgRcC3uP/JMKYUqsiRAuaDokARZxVTV5hUElfpO6z6/NItSDvvh3i
eikCAwEAAaMaMBgwCQYDVR0TBAIwADALBgNVHQ8EBAMCBeAwDQYJKoZIhvcNAQEL
BQADggEBABchYxKo0YMma7g1qDswJXsR5s56Czx/I+B41YcpMBMTrRqpUC0nHtLk
M7/tZp592u/tT8gzEnQjZLKBAhFeZaR3aaKyknLqwiPqJIgg0pgsBGITrAK3Pv4z
5/YvAJJKgTe5UdeTz6U4lvNEux/4juZ4pmqH4qSFJTOzQS7LmgSmNIdd072rwXBd
UzcSHzsJgEMb88u/LDLjj1pQ7AtZ4Tta8JZTvcgBFmjB0QUi6fgkHY6oGat/W4kR
jSRUBlMUbM/drr2PVzRc2dwbFIl3X+ZE6n5Sl3ZwRAC/s92JU6CPMRW02muVu6xl
goraNgPISnrbpR6KjxLZkVembXzjNNc=
-----END CERTIFICATE-----

View File

@ -1,27 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA3MKOUmqEac5KsvmPTgAzi+1POIdVRX42+VLua2npznpZh0mg
maVT9hLA1Glmr7Wmx3fX2uTqupTsmTw2TOsbbX4u/BFeEUXxCeF3bxOeFjGUgwBs
+EcMXxjkFgUhcqdTa8099nqng0sgaxN44VsfVFFDMLYYOWGIwnQcCvg8H4bH0tWH
gH1zzTvwRkk3tHT4Yy87V3tQZBTuVQMJEl2j7NpQSXXToKU0y10N/oSDy6M5Imkz
jDryDF7ArKXYSW0nPowABmU+V/U8e4bWGz1yMMgyYHmBpi7a/V8TyfFxka6cZAd8
wEkXMEM7MU3MJBuBN9k+d7SrJY05DbdeayKKNwIDAQABAoIBAC6ww3Mw7iKGrAvg
dmuz5TMSFPBKx0E0aaIf5Sc4tmeiPu87Jkl4yyI/YyNJy5scG1MSyMeWJQMjXksm
jgGEtD9bMcrETZXvqgRB+IW4q3XcNKHkZCe6tyYh2JPDsAhU1XL2bMWFuYouSIP9
EVLwd9bYfRJ/YO4577fY4Nl9GRI9hdOB0Y4dDvxHCprxXC/wH6NpvI5dktTPr2xl
pNqABKdG8XEzP0duIpQf5zXbfDAWRUEpB9MDBXqmKmdjdPnpNS7JtkmCtWogdA9F
LcyFI3e86qB9HHaqq1hBsQEG/DYj0RxCcAQFqTfvpxmZOXDlfWdB7M8xnqkD5xT0
s6K1TXECgYEA79Lx5FFxfkN/uZKQzV1slJ/GSyfJqKhRh38/8G1ncmSG5dh0QMht
Tt7FbFhYwGZQY9iMq1g/ujlHAzdKbFHGRX0z30xP7kf0R/L0W2yHMq59Ys4nUhGY
o1v2sGxgDDP9XPNm/MV8DCZcoLMxvvFrfWLMYcvWTJb8TBGQgqpcEF8CgYEA66Zu
d+l2W5LSTgwYeIAQuiIhhNLY9Ct94TWrum5QZMdeR+IUYn+dT817Qbmf4KiiihfJ
V8t3tYgBBamNpqMKpm7An+HnFgRoV3o8W0pjlKdaQ0EiwhTQsLJcmZ1JV/k9Dd4V
Rl26M0DZRKTHIUWLt7nNYexydQpfWlfRX1/n9SkCgYAKphUzjCI79wdO2CEx3Tob
B1UotSWRJZgpKg9Ov6zeOXR79DaFQeEIpX+ipfGa6XAcXtswKIT74dszW1skoCTr
pPmOqrbJ38wK/dC31oPSTkkm//xi+oEKj+TORKGnKQ/Q9sXV53bwmyt1vz8wOUwK
jz6AASsMz494WTdPdf0MhQKBgQDGoBos6JPiy/aH4podt5Rhz7MBCdfkt2P7GAoP
sjwBNiq53E3iWD54rXJfC98+teWLEFGdttrIIEL8StYixvqLHn8uRHNLk5t/YIDP
UfxtqEHkvlpVzMW6qhxzPqg7htF3huHX1djEqrx3p4xQ9xW1Xt9G0s4G6R9GPw8z
nNsfQQKBgF1nvj5xhD7fiVzS7NrjtBslDxKGQCfs9f1Xl2eadGp6pgwG/hvw5oO7
gtoYJuPq+Zu92a+UDQVErXMHiXn3iza/3EOf2BP9zbq9mBGZtKmLExP0QGEmDygb
Yo18YdfwWwqxvEf+jt2URv0w+KNWL/3j5rDmngNa1iNubX3p1AK6
MIIEpAIBAAKCAQEAzLiGiSwpxkENtjrzS7pNLblTnWe4HUUFwYyUX0H+3TnvA86X
EX85yZvFjkzB6lLjUkMY+C6UTVXt+mxeSJbUtSKZhX+2yoF/KYh7SaVjug5FqEqO
LvMpDZQEhUWF2W9DG6eUgOfDoX2milSDIe10yG2WBkryipHAfE7l1t+i6Rh3on+v
561LmrbqyBWR/cLp23RN3sHbkf2pb5/ugtU9twdgJr6Lve73rvSeulewL5BzszKD
BrYqr+PBT5+3ItCc55bTsO7M7CzOIL99BlqdvFH7xT0U1+2BFwLe4/8kwphSqyJE
C5oOiQBFnFVNXmFQSV+k7rPr80i1IO++HeJ6KQIDAQABAoIBAGWgvPjfuaU3qizq
uti/FY07USz0zkuJdkANH6LiSjlchzDmn8wJ0pApCjuIE0PV/g9aS8z4opp5q/gD
UBLM/a8mC/xf2EhTXOMrY7i9p/I3H5FZ4ZehEqIw9sWKK9YzC6dw26HabB2BGOnW
5nozPSQ6cp2RGzJ7BIkxSZwPzPnVTgy3OAuPOiJytvK+hGLhsNaT+Y9bNDvplVT2
ZwYTV8GlHZC+4b2wNROILm0O86v96O+Qd8nn3fXjGHbMsAnONBq10bZS16L4fvkH
5G+W/1PeSXmtZFppdRRDxIW+DWcXK0D48WRliuxcV4eOOxI+a9N2ZJZZiNLQZGwg
w3A8+mECgYEA8HuJFrlRvdoBe2U/EwUtG74dcyy30L4yEBnN5QscXmEEikhaQCfX
Wm6EieMcIB/5I5TQmSw0cmBMeZjSXYoFdoI16/X6yMMuATdxpvhOZGdUGXxhAH+x
xoTUavWZnEqW3fkUU71kT5E2f2i+0zoatFESXHeslJyz85aAYpP92H0CgYEA2e5A
Yozt5eaA1Gyhd8SeptkEU4xPirNUnVQHStpMWUb1kzTNXrPmNWccQ7JpfpG6DcYl
zUF6p6mlzY+zkMiyPQjwEJlhiHM2NlL1QS7td0R8ewgsFoyn8WsBI4RejWrEG9td
EDniuIw+pBFkcWthnTLHwECHdzgquToyTMjrBB0CgYEA28tdGbrZXhcyAZEhHAZA
Gzog+pKlkpEzeonLKIuGKzCrEKRecIK5jrqyQsCjhS0T7ZRnL4g6i0s+umiV5M5w
fcc292pEA1h45L3DD6OlKplSQVTv55/OYS4oY3YEJtf5mfm8vWi9lQeY8sxOlQpn
O+VZTdBHmTC8PGeTAgZXHZUCgYA6Tyv88lYowB7SN2qQgBQu8jvdGtqhcs/99GCr
H3N0I69LPsKAR0QeH8OJPXBKhDUywESXAaEOwS5yrLNP1tMRz5Vj65YUCzeDG3kx
gpvY4IMp7ArX0bSRvJ6mYSFnVxy3k174G3TVCfksrtagHioVBGQ7xUg5ltafjrms
n8l55QKBgQDVzU8tQvBVqY8/1lnw11Vj4fkE/drZHJ5UkdC1eenOfSWhlSLfUJ8j
ds7vEWpRPPoVuPZYeR1y78cyxKe1GBx6Wa2lF5c7xjmiu0xbRnrxYeLolce9/ntp
asClqpnHT8/VJYTD7Kqj0fouTTZf0zkig/y+2XERppd8k+pSKjUCPQ==
-----END RSA PRIVATE KEY-----

View File

@ -1,27 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAwuSROPDyfwRUyxLHhG6xiHWAesNvYZnuChSpgKpS0JKcMGSQ
LJv/Yrl8urLyX+0883Ne9yZIM8Ow84c0n9VYEQ6zgkEviFOrPFF31DCbfwJj9UpF
NplCf/qZxBkSrVZBfSwknIpyPeSMLd8Z4HYzGF8JPkvS3t//O/NgpQcGRr3310O4
XeR50oYMqhVXj4Kypq/O/TU8pX7wFj8LsdpHVzZ2zhyK4XGZtDXjDX+4bFDVERZa
qOKwNBVfXSBzK8j5DO8oKGjKINl5nAp6v43/zHz00D2I0PYizzhq2ItKWF9drhW+
eXbkRrL7OzCJDXPH59lREaAuvUF/hivswFBmfwIDAQABAoIBAQCYa9gj11Vf/0wt
kh9WNJhGJ9d2q5hVleR0H9q9FPg1xSPAOTYEnXBrjrO89CzY1xq/L7DKzDbVvSuM
GmcOxfTdSkkcCs0Y6o7WWsTDv8ws1frFIPPmkpBOtPhDRHS1+eq38akkgKZ+P1te
mMiNIwQtAE6jWPuvcTIVee9QwaCn+5ZYIwICORNFoLsl7sKdLOfccSO7v9L/Az5r
AT4xrJwpKl5MjOGzOxFv6M1rTh/Y9e17U+2/QQDnW4U7C4/gkQ1urJddaeDDnz8t
GLAnshCdF8eL3vAKO6sMJiEGuVe3b2oBYrRjp7FSB1uLWWlFRb7TGE07UXP0JZDn
W1lmUbcBAoGBAO4/37Obk1pM6GQzS49AwLJtz5Z9DpxMSaVW6XHQlOq6RBNQsMR0
MS5k5TZgX0HZXAu0dGaPNzD7868dwTZE7tn6a1QanfmrqQVbJxHWTJtPEE0QGpGI
vg2D6iiYUE0mVEiKf7dNp6hpp9ioYIdsRQK0H/u3sU/JfEFr00XpMb+BAoGBANFp
wMIcB7RbyShO8QR/kVpahlOOnNDP7e+9KdUFl8i300ecO2QNR+hlQ+565J8nsANj
Y2kLMls2DTzMefrEZPecb8onjGFmSkwf9uCs8vmorlYmYmNlJkLL06ZN3SrpmHBD
GogkCt1qkrTgtszjSqZe94UcpT+mfatSK4lRlSX/AoGAXdE7Ns/Ji6KDVIm6dFOs
TdbeCsV+DmAgFAKQdKgNLA1jJzP8F7Aleb5zYCE9AYIlM9rAh25X7msYf1m5LrSg
Vae9weWlVZ6aNSi6ztRTYEkXAzGXNL3jERFkEM5BuM+iGtqnBjiHD9NjK/bJ5Cnn
VvQ1L/sa0G9oBZ7/GCWG2IECgYEAqAOmENb2Y4FEyl9Txl0nXIvGvCFutaYt66wk
dPIQzoyWKh0yFVsGd3FP6HWXGg54jK9gIfZGx6F9S2tu7oBF1dggZNwIKFkugRcg
NzDrnNz2Ss5vH/oWkX8BZ6uPKA/VKzTbg6EPSohn/lFQuOAfk44cHyNVfdTxfNPn
dDwNYzcCgYEA3Wig1HRNvTOnSskFz8eTmpu89hg1atuAk+c2d1Z+9HKVjkc6B/91
pabnbujERtH3HW9TQ9+VVfImgC3Jy+TjsS3d6nA7e9060N9z30mVEY5lq03mKAMl
tSKFk4fRRxsKPsBN/NS0BiU8LJTzsDwLwRm9T4BNos+I35a8tFCCmtw=
MIIEowIBAAKCAQEAs15ZPekT5AV+JEnMqacAxStajcraP9Obf5eeXpYvPrv/Stxi
sltsq9Le7JE/+y1fJcQrD0J6nJDVWIFUWRYCjLypAQ5YUOxlz/lTOFdSdvotevep
OQUSM2aGP7u7O/+VsHGVQbU2VjNJ44Hr9FPzMf+WE54qEudZg9d1cqxrUUvqKPhf
wN4M7WRjt+8AaYGf9MeHW5OkOLMzhiZ4j7vh2ZrIDnFe8BG17mHhW7lIjN7uILTn
s36/KZOgyTYmCT5lkWnJeuer7KjpFoPcA1yWtzgVt5RBnlrmzlLGCz1rRjdYwb7t
zlWdVdxuMFHP9qrY206dBCOKQopADwYXTzQFCQIDAQABAoIBAQCuvCbr7Pd3lvI/
n7VFQG+7pHRe1VKwAxDkx2t8cYos7y/QWcm8Ptwqtw58HzPZGWYrgGMCRpzzkRSF
V9g3wP1S5Scu5C6dBu5YIGc157tqNGXB+SpdZddJQ4Nc6yGHXYERllT04ffBGc3N
WG/oYS/1cSteiSIrsDy/91FvGRCi7FPxH3wIgHssY/tw69s1Cfvaq5lr2NTFzxIG
xCvpJKEdSfVfS9I7LYiymVjst3IOR/w76/ZFY9cRa8ZtmQSWWsm0TUpRC1jdcbkm
ZoJptYWlP+gSwx/fpMYftrkJFGOJhHJHQhwxT5X/ajAISeqjjwkWSEJLwnHQd11C
Zy2+29lBAoGBANlEAIK4VxCqyPXNKfoOOi5dS64NfvyH4A1v2+KaHWc7lqaqPN49
ezfN2n3X+KWx4cviDD914Yc2JQ1vVJjSaHci7yivocDo2OfZDmjBqzaMp/y+rX1R
/f3MmiTqMa468rjaxI9RRZu7vDgpTR+za1+OBCgMzjvAng8dJuN/5gjlAoGBANNY
uYPKtearBmkqdrSV7eTUe49Nhr0XotLaVBH37TCW0Xv9wjO2xmbm5Ga/DCtPIsBb
yPeYwX9FjoasuadUD7hRvbFu6dBa0HGLmkXRJZTcD7MEX2Lhu4BuC72yDLLFd0r+
Ep9WP7F5iJyagYqIZtz+4uf7gBvUDdmvXz3sGr1VAoGAdXTD6eeKeiI6PlhKBztF
zOb3EQOO0SsLv3fnodu7ZaHbUgLaoTMPuB17r2jgrYM7FKQCBxTNdfGZmmfDjlLB
0xZ5wL8ibU30ZXL8zTlWPElST9sto4B+FYVVF/vcG9sWeUUb2ncPcJ/Po3UAktDG
jYQTTyuNGtSJHpad/YOZctkCgYBtWRaC7bq3of0rJGFOhdQT9SwItN/lrfj8hyHA
OjpqTV4NfPmhsAtu6j96OZaeQc+FHvgXwt06cE6Rt4RG4uNPRluTFgO7XYFDfitP
vCppnoIw6S5BBvHwPP+uIhUX2bsi/dm8vu8tb+gSvo4PkwtFhEr6I9HglBKmcmog
q6waEQKBgHyecFBeM6Ls11Cd64vborwJPAuxIW7HBAFj/BS99oeG4TjBx4Sz2dFd
rzUibJt4ndnHIvCN8JQkjNG14i9hJln+H3mRss8fbZ9vQdqG+2vOWADYSzzsNI55
RFY7JjluKcVkp/zCDeUxTU3O6sS+v6/3VE11Cob6OYQx3lN5wrZ3
-----END RSA PRIVATE KEY-----

View File

@ -414,6 +414,9 @@ log.dir = {{ platform_log_dir }}
## The log filename for logs of level specified in "log.level".
##
## If `log.rotation` is enabled, this is the base name of the
## files. Each file in a rotated log is named <base_name>.N, where N is an integer.
##
## Value: String
## Default: emqx.log
log.file = emqx.log
@ -424,6 +427,14 @@ log.file = emqx.log
## Default: No Limit
#log.chars_limit = 8192
## Enables the log rotation.
## With this enabled, new log files will be created when the current
## log file is full, max to `log.rotation.size` files will be created.
##
## Value: on | off
## Default: on
log.rotation = on
## Maximum size of each log file.
##
## Value: Number
@ -446,9 +457,103 @@ log.rotation.count = 5
## Note: Log files for a specific log level will only contain all the logs
## that higher than or equal to that level
##
#log.info.file = info.log
#log.info.file = info.log
#log.error.file = error.log
## The max allowed queue length before switching to sync mode.
##
## Log overload protection parameter. If the message queue grows
## larger than this value the handler switches from anync to sync mode.
##
## Default: 100
##
#log.sync_mode_qlen = 100
## The max allowed queue length before switching to drop mode.
##
## Log overload protection parameter. When the message queue grows
## larger than this threshold, the handler switches to a mode in which
## it drops all new events that senders want to log.
##
## Default: 3000
##
#log.drop_mode_qlen = 3000
## The max allowed queue length before switching to flush mode.
##
## Log overload protection parameter. If the length of the message queue
## grows larger than this threshold, a flush (delete) operation takes place.
## To flush events, the handler discards the messages in the message queue
## by receiving them in a loop without logging.
##
## Default: 8000
##
#log.flush_qlen = 8000
## Kill the log handler when it gets overloaded.
##
## Log overload protection parameter. It is possible that a handler,
## even if it can successfully manage peaks of high load without crashing,
## can build up a large message queue, or use a large amount of memory.
## We could kill the log handler in these cases and restart it after a
## few seconds.
##
## Default: on
##
#log.overload_kill = on
## The max allowed queue length before killing the log hanlder.
##
## Log overload protection parameter. This is the maximum allowed queue
## length. If the message queue grows larger than this, the handler
## process is terminated.
##
## Default: 20000
##
#log.overload_kill_qlen = 20000
## The max allowed memory size before killing the log hanlder.
##
## Log overload protection parameter. This is the maximum memory size
## that the handler process is allowed to use. If the handler grows
## larger than this, the process is terminated.
##
## Default: 30MB
##
#log.overload_kill_mem_size = 30MB
## Restart the log hanlder after some seconds.
##
## Log overload protection parameter. If the handler is terminated,
## it restarts automatically after a delay specified in seconds.
## The value "infinity" prevents restarts.
##
## Default: 5s
##
#log.overload_kill_restart_after = 5s
## Max burst count and time window for burst control.
##
## Log overload protection parameter. Large bursts of log events - many
## events received by the handler under a short period of time - can
## potentially cause problems. By specifying the maximum number of events
## to be handled within a certain time frame, the handler can avoid
## choking the log with massive amounts of printouts.
##
## This config controls the maximum number of events to handle within
## a time frame. After the limit is reached, successive events are
## dropped until the end of the time frame.
##
## Note that there would be no warning if any messages were
## dropped because of burst control.
##
## Comment this config out to disable the burst control feature.
##
## Value: MaxBurstCount,TimeWindow
## Default: disabled
##
#log.burst_limit = 20000, 1s
##--------------------------------------------------------------------
## Authentication/Access Control
##--------------------------------------------------------------------
@ -1399,18 +1504,6 @@ listener.ws.external.access.1 = allow all
## Value: on | off
listener.ws.external.verify_protocol_header = on
## Use X-Forwarded-For header for real source IP if the EMQ X cluster is
## deployed behind NGINX or HAProxy.
##
## Value: String
## listener.ws.external.proxy_address_header = X-Forwarded-For
## Use X-Forwarded-Port header for real source port if the EMQ X cluster is
## deployed behind NGINX or HAProxy.
##
## Value: String
## listener.ws.external.proxy_port_header = X-Forwarded-Port
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
## HAProxy or Nginx.
##

View File

@ -477,11 +477,21 @@ end}.
{datatype, integer}
]}.
{mapping, "log.rotation", "kernel.logger", [
{default, on},
{datatype, flag}
]}.
{mapping, "log.rotation.size", "kernel.logger", [
{default, "10MB"},
{datatype, bytesize}
]}.
{mapping, "log.size", "kernel.logger", [
{default, infinity},
{datatype, [bytesize, atom]}
]}.
{mapping, "log.rotation.count", "kernel.logger", [
{default, 5},
{datatype, integer}
@ -491,6 +501,46 @@ end}.
{datatype, file}
]}.
{mapping, "log.sync_mode_qlen", "kernel.logger", [
{default, 100},
{datatype, integer}
]}.
{mapping, "log.drop_mode_qlen", "kernel.logger", [
{default, 3000},
{datatype, integer}
]}.
{mapping, "log.flush_qlen", "kernel.logger", [
{default, 8000},
{datatype, integer}
]}.
{mapping, "log.overload_kill", "kernel.logger", [
{default, on},
{datatype, flag}
]}.
{mapping, "log.overload_kill_mem_size", "kernel.logger", [
{default, "30MB"},
{datatype, bytesize}
]}.
{mapping, "log.overload_kill_qlen", "kernel.logger", [
{default, 20000},
{datatype, integer}
]}.
{mapping, "log.overload_kill_restart_after", "kernel.logger", [
{default, "5s"},
{datatype, [{duration, ms}, atom]}
]}.
{mapping, "log.burst_limit", "kernel.logger", [
{default, "disabled"},
{datatype, string}
]}.
{mapping, "log.sasl", "sasl.sasl_error_logger", [
{default, off},
{datatype, flag},
@ -521,6 +571,10 @@ end}.
{translation, "kernel.logger", fun(Conf) ->
LogTo = cuttlefish:conf_get("log.to", Conf),
LogLevel = cuttlefish:conf_get("log.level", Conf),
LogType = case cuttlefish:conf_get("log.rotation", Conf) of
true -> wrap;
false -> halt
end,
CharsLimit = case cuttlefish:conf_get("log.chars_limit", Conf) of
-1 -> unlimited;
V -> V
@ -537,11 +591,37 @@ end}.
[]}]},
msg,"\n"],
chars_limit => CharsLimit}},
{BustLimitOn, {MaxBurstCount, TimeWindow}} =
case string:tokens(cuttlefish:conf_get("log.burst_limit", Conf), ", ") of
["disabled"] -> {false, {20000, 1000}};
[Count, Window] ->
{true, {list_to_integer(Count),
case cuttlefish_duration:parse(Window, ms) of
Secs when is_integer(Secs) -> Secs;
{error, Reason1} -> error(Reason1)
end}}
end,
FileConf = fun(Filename) ->
#{type => wrap,
BasicConf =
#{type => LogType,
file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename),
max_no_files => cuttlefish:conf_get("log.rotation.count", Conf),
max_no_bytes => cuttlefish:conf_get("log.rotation.size", Conf)}
sync_mode_qlen => cuttlefish:conf_get("log.sync_mode_qlen", Conf),
drop_mode_qlen => cuttlefish:conf_get("log.drop_mode_qlen", Conf),
flush_qlen => cuttlefish:conf_get("log.flush_qlen", Conf),
overload_kill_enable => cuttlefish:conf_get("log.overload_kill", Conf),
overload_kill_qlen => cuttlefish:conf_get("log.overload_kill_qlen", Conf),
overload_kill_mem_size => cuttlefish:conf_get("log.overload_kill_mem_size", Conf),
overload_kill_restart_after => cuttlefish:conf_get("log.overload_kill_restart_after", Conf),
burst_limit_enable => BustLimitOn,
burst_limit_max_count => MaxBurstCount,
burst_limit_window_time => TimeWindow
},
MaxNoBytes = case LogType of
wrap -> cuttlefish:conf_get("log.rotation.size", Conf);
halt -> cuttlefish:conf_get("log.size", Conf)
end,
BasicConf#{max_no_bytes => MaxNoBytes}
end,
%% For the default logger that outputs to console
@ -1310,16 +1390,6 @@ end}.
{datatype, flag}
]}.
{mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [
{datatype, string},
hidden
]}.
{mapping, "listener.ws.$name.proxy_port_header", "emqx.listeners", [
{datatype, string},
hidden
]}.
{mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [
{datatype, flag}
]}.
@ -1467,16 +1537,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.wss.$name.proxy_address_header", "emqx.listeners", [
{datatype, string},
hidden
]}.
{mapping, "listener.wss.$name.proxy_port_header", "emqx.listeners", [
{datatype, string},
hidden
]}.
{mapping, "listener.wss.$name.proxy_protocol", "emqx.listeners", [
{datatype, flag}
]}.
@ -1681,11 +1741,9 @@ end}.
{proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
{verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
{proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
{compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)},
{idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)},
{max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)},
{proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)])
{max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)} | AccOpts(Prefix)])
end,
DeflateOpts = fun(Prefix) ->
Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)},

View File

@ -5,7 +5,7 @@
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.2"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.1"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.2"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.3"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.

View File

@ -114,7 +114,7 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
{error, Reason} ->
?LOG(error, "Failed to encode alarm: ~p", [Reason])
end,
set_alarm_(AlarmId, AlarmDesc),
set_alarm_(AlarmId, AlarmDesc, erlang:system_time(second)),
{ok, State};
handle_event({clear_alarm, AlarmId}, State) ->
?LOG(info, "Clear Alarm: ~p", [AlarmId]),
@ -164,10 +164,12 @@ encode_alarm({AlarmId, #alarm{severity = Severity,
});
encode_alarm({AlarmId, undefined}) ->
emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId)});
emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId),
desc => #{timestamp => erlang:system_time(second)}});
encode_alarm({AlarmId, AlarmDesc}) ->
emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId),
desc => maybe_to_binary(AlarmDesc)
desc => #{summary => maybe_to_binary(AlarmDesc),
timestamp => erlang:system_time(second)}
}).
alarm_msg(Topic, Payload) ->
@ -185,8 +187,8 @@ maybe_to_binary(Data) when is_binary(Data) ->
maybe_to_binary(Data) ->
iolist_to_binary(io_lib:format("~p", [Data])).
set_alarm_(Id, Desc) ->
mnesia:dirty_write(?ALARM_TAB, #common_alarm{id = Id, desc = Desc}).
set_alarm_(Id, Desc, Ts) ->
mnesia:dirty_write(?ALARM_TAB, #common_alarm{id = Id, desc = {Desc, Ts}}).
clear_alarm_(Id) ->
case mnesia:dirty_read(?ALARM_TAB, Id) of

View File

@ -217,6 +217,9 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
handle_out(connack, {ReasonCode, ConnPkt}, NChannel)
end;
handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when ConnState =/= connected ->
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
case emqx_packet:check(Packet) of
ok -> process_publish(Packet, Channel);
@ -482,6 +485,21 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
{error, RC} -> {RC, Channel}
end.
-compile({inline, [process_force_subscribe/2]}).
process_force_subscribe(Subscriptions, Channel =
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
session = Session}) ->
lists:foldl(fun({TopicFilter, SubOpts = #{qos := QoS}}, {ReasonCodes, ChannelAcc}) ->
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), ChannelAcc),
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
{ok, NSession} ->
{ReasonCodes ++ [QoS], ChannelAcc#channel{session = NSession}};
{error, ReasonCode} ->
{ReasonCodes ++ [ReasonCode], ChannelAcc}
end
end, {[], Channel}, Subscriptions).
%%--------------------------------------------------------------------
%% Process Unsubscribe
%%--------------------------------------------------------------------
@ -507,6 +525,20 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
{error, RC} -> {RC, Channel}
end.
-compile({inline, [process_force_unsubscribe/2]}).
process_force_unsubscribe(Subscriptions, Channel =
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
session = Session}) ->
lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) ->
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of
{ok, NSession} ->
{ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}};
{error, ReasonCode} ->
{ReasonCodes ++ [ReasonCode], ChannelAcc}
end
end, {[], Channel}, Subscriptions).
%%--------------------------------------------------------------------
%% Process Disconnect
%%--------------------------------------------------------------------
@ -760,6 +792,10 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
{ok, NChannel};
handle_info({force_subscribe, TopicFilters}, Channel) ->
{_ReasonCodes, NChannel} = process_force_subscribe(parse_topic_filters(TopicFilters), Channel),
{ok, NChannel};
handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
TopicFilters1 = run_hooks('client.unsubscribe',
[ClientInfo, #{'Internal' => true}],
@ -768,6 +804,10 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
{ok, NChannel};
handle_info({force_unsubscribe, TopicFilters}, Channel) ->
{_ReasonCodes, NChannel} = process_force_unsubscribe(parse_topic_filters(TopicFilters), Channel),
{ok, NChannel};
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(Reason, Channel);
@ -843,10 +883,10 @@ handle_timeout(_TRef, expire_awaiting_rel,
handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{session = Session}) ->
case emqx_session:expire(awaiting_rel, Session) of
{ok, Session} ->
{ok, clean_timer(await_timer, Channel#channel{session = Session})};
{ok, Timeout, Session} ->
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})}
{ok, NSession} ->
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
{ok, Timeout, NSession} ->
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
end;
handle_timeout(_TRef, expire_session, Channel) ->

View File

@ -27,9 +27,7 @@
-export([start_link/0]).
-export([ register_channel/1
, register_channel/2
, register_channel/3
-export([ register_channel/3
, unregister_channel/1
]).
@ -45,6 +43,8 @@
, set_chan_stats/2
]).
-export([get_chann_conn_mod/2]).
-export([ open_session/3
, discard_session/1
, discard_session/2
@ -98,28 +98,29 @@ start_link() ->
%% API
%%--------------------------------------------------------------------
%% @doc Register a channel.
-spec(register_channel(emqx_types:clientid()) -> ok).
register_channel(ClientId) ->
register_channel(ClientId, self()).
%% @doc Register a channel with pid.
-spec(register_channel(emqx_types:clientid(), chan_pid()) -> ok).
register_channel(ClientId, ChanPid) when is_pid(ChanPid) ->
Chan = {ClientId, ChanPid},
true = ets:insert(?CHAN_TAB, Chan),
true = ets:insert(?CHAN_CONN_TAB, Chan),
ok = emqx_cm_registry:register_channel(Chan),
cast({registered, Chan}).
%% @doc Register a channel with info and stats.
-spec(register_channel(emqx_types:clientid(),
emqx_types:infos(),
emqx_types:stats()) -> ok).
register_channel(ClientId, Info, Stats) ->
register_channel(ClientId, Info = #{conninfo := ConnInfo}, Stats) ->
Chan = {ClientId, ChanPid = self()},
true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
register_channel(ClientId, ChanPid).
register_channel(ClientId, ChanPid, ConnInfo);
%% @private
%% @doc Register a channel with pid and conn_mod.
%%
%% There is a Race-Condition on one node or cluster when many connections
%% login to Broker with the same clientid. We should register it and save
%% the conn_mod first for taking up the clientid access right.
%%
%% Note that: It should be called on a lock transaction
register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
Chan = {ClientId, ChanPid},
true = ets:insert(?CHAN_TAB, Chan),
true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
ok = emqx_cm_registry:register_channel(Chan),
cast({registered, Chan}).
%% @doc Unregister a channel.
-spec(unregister_channel(emqx_types:clientid()) -> ok).
@ -130,7 +131,7 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
%% @private
do_unregister_channel(Chan) ->
ok = emqx_cm_registry:unregister_channel(Chan),
true = ets:delete_object(?CHAN_CONN_TAB, Chan),
true = ets:delete(?CHAN_CONN_TAB, Chan),
true = ets:delete(?CHAN_INFO_TAB, Chan),
ets:delete_object(?CHAN_TAB, Chan).
@ -204,24 +205,29 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
pendings => list()}}
| {error, Reason :: term()}).
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
CleanStart = fun(_) ->
ok = discard_session(ClientId),
Session = create_session(ClientInfo, ConnInfo),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
end,
emqx_cm_locker:trans(ClientId, CleanStart);
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
ResumeStart = fun(_) ->
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
pendings => Pendings}};
{error, not_found} ->
Session = create_session(ClientInfo, ConnInfo),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
end
end,
@ -251,8 +257,8 @@ takeover_session(ClientId) ->
end.
takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
case get_chann_conn_mod(ClientId, ChanPid) of
ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
{ok, ConnMod, ChanPid, Session};
undefined ->
@ -282,8 +288,8 @@ discard_session(ClientId) when is_binary(ClientId) ->
end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
case get_chann_conn_mod(ClientId, ChanPid) of
ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard);
undefined -> ok
end;
@ -411,3 +417,12 @@ update_stats({Tab, Stat, MaxStat}) ->
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
end.
get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod
catch
error:badarg -> undefined
end;
get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).

View File

@ -109,7 +109,7 @@ init([]) ->
{read_concurrency, true},
{write_concurrency, true}
]),
{ok, #{}, hibernate}.
{ok, ensure_timer(#{}), hibernate}.
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
@ -142,6 +142,12 @@ handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, expired_detecting}, State = #{expired_timer := TRef}) ->
Timestamp = erlang:system_time(millisecond) - maps:get(duration, get_policy()),
MatchSpec = [{{'_', '_', '_', '$1', '_'},[{'<', '$1', Timestamp}], [true]}],
ets:select_delete(?FLAPPING_TAB, MatchSpec),
{noreply, ensure_timer(State), hibernate};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
@ -151,3 +157,8 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
ensure_timer(State) ->
Timeout = maps:get(duration, get_policy()),
TRef = emqx_misc:start_timer(Timeout, expired_detecting),
State#{expired_timer => TRef}.

View File

@ -55,7 +55,7 @@ encode(Term) ->
-spec(encode(json_term(), encode_options()) -> json_text()).
encode(Term, Opts) ->
jiffy:encode(to_ejson(Term), Opts).
to_binary(jiffy:encode(to_ejson(Term), Opts)).
-spec(safe_encode(json_term())
-> {ok, json_text()} | {error, Reason :: term()}).
@ -118,3 +118,7 @@ from_ejson({L}) ->
[{Name, from_ejson(Value)} || {Name, Value} <- L];
from_ejson(T) -> T.
to_binary(B) when is_binary(B) -> B;
to_binary(L) when is_list(L) ->
iolist_to_binary(L).

View File

@ -191,7 +191,13 @@ init(Req, Opts) ->
end.
websocket_init([Req, Opts]) ->
Peername = cowboy_req:peer(Req),
Peername = case proplists:get_bool(proxy_protocol, Opts)
andalso maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort} ->
{SrcAddr, SrcPort};
_ ->
cowboy_req:peer(Req)
end,
Sockname = cowboy_req:sock(Req),
Peercert = cowboy_req:cert(Req),
WsCookie = try cowboy_req:parse_cookies(Req)

View File

@ -112,6 +112,12 @@ t_handle_in_unexpected_connect_packet(_) ->
{ok, [{outgoing, Packet}, {close, protocol_error}], Channel} =
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
t_handle_in_unexpected_packet(_) ->
Channel = emqx_channel:set_field(conn_state, idle, channel()),
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
{ok, [{outgoing, Packet}, {close, protocol_error}], Channel} =
emqx_channel:handle_in(?PUBLISH_PACKET(?QOS_0), Channel).
t_handle_in_qos0_publish(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Channel = channel(#{conn_state => connected}),

View File

@ -279,7 +279,7 @@ t_username_as_clientid(_) ->
emqtt:disconnect(C).
t_certcn_as_clientid(_) ->
CN = <<"0004.novalocal">>,
CN = <<"Client">>,
emqx_zone:set_env(external, use_username_as_clientid, true),
SslConf = emqx_ct_helpers:client_ssl_twoway(),
{ok, C} = emqtt:start_link([{port, 8883}, {ssl, true}, {ssl_opts, SslConf}]),

View File

@ -23,6 +23,13 @@
-include_lib("eunit/include/eunit.hrl").
-define(CM, emqx_cm).
-define(ChanInfo,#{conninfo =>
#{socktype => tcp,
peername => {{127,0,0,1}, 5000},
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
receive_maximum => 100}}).
%%--------------------------------------------------------------------
%% CT callbacks
@ -43,13 +50,13 @@ end_per_suite(_Config) ->
%%--------------------------------------------------------------------
t_reg_unreg_channel(_) ->
ok = emqx_cm:register_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
?assertEqual([self()], emqx_cm:lookup_channels(<<"clientid">>)),
ok = emqx_cm:unregister_channel(<<"clientid">>),
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
t_get_set_chan_info(_) ->
Info = #{proto_ver => 4, proto_name => <<"MQTT">>},
Info = ?ChanInfo,
ok = emqx_cm:register_channel(<<"clientid">>, Info, []),
?assertEqual(Info, emqx_cm:get_chan_info(<<"clientid">>)),
Info1 = Info#{proto_ver => 5},
@ -60,7 +67,7 @@ t_get_set_chan_info(_) ->
t_get_set_chan_stats(_) ->
Stats = [{recv_oct, 10}, {send_oct, 8}],
ok = emqx_cm:register_channel(<<"clientid">>, #{}, Stats),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, Stats),
?assertEqual(Stats, emqx_cm:get_chan_stats(<<"clientid">>)),
Stats1 = [{recv_oct, 10}|Stats],
true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1),
@ -69,27 +76,89 @@ t_get_set_chan_stats(_) ->
?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)).
t_open_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ClientInfo = #{zone => external,
clientid => <<"clientid">>,
username => <<"username">>,
peerhost => {127,0,0,1}},
ConnInfo = #{peername => {{127,0,0,1}, 5000},
ConnInfo = #{socktype => tcp,
peername => {{127,0,0,1}, 5000},
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
receive_maximum => 100},
{ok, #{session := Session1, present := false}}
= emqx_cm:open_session(true, ClientInfo, ConnInfo),
?assertEqual(100, emqx_session:info(inflight_max, Session1)),
{ok, #{session := Session2, present := false}}
= emqx_cm:open_session(false, ClientInfo, ConnInfo),
?assertEqual(100, emqx_session:info(inflight_max, Session2)).
= emqx_cm:open_session(true, ClientInfo, ConnInfo),
?assertEqual(100, emqx_session:info(inflight_max, Session2)),
emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection).
t_open_session_race_condition(_) ->
ClientInfo = #{zone => external,
clientid => <<"clientid">>,
username => <<"username">>,
peerhost => {127,0,0,1}},
ConnInfo = #{socktype => tcp,
peername => {{127,0,0,1}, 5000},
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
receive_maximum => 100},
Parent = self(),
OpenASession = fun() ->
timer:sleep(rand:uniform(100)),
OpenR = (emqx_cm:open_session(true, ClientInfo, ConnInfo)),
Parent ! OpenR,
case OpenR of
{ok, _} ->
receive
{'$gen_call', From, discard} ->
gen_server:reply(From, ok), ok
end;
{error, Reason} ->
exit(Reason)
end
end,
[spawn(
fun() ->
spawn(OpenASession),
spawn(OpenASession)
end) || _ <- lists:seq(1, 1000)],
WaitingRecv = fun _Wr(N1, N2, 0) ->
{N1, N2};
_Wr(N1, N2, Rest) ->
receive
{ok, _} -> _Wr(N1+1, N2, Rest-1);
{error, _} -> _Wr(N1, N2+1, Rest-1)
end
end,
ct:pal("Race condition status: ~p~n", [WaitingRecv(0, 0, 2000)]),
?assertEqual(1, ets:info(emqx_channel, size)),
?assertEqual(1, ets:info(emqx_channel_conn, size)),
?assertEqual(1, ets:info(emqx_channel_registry, size)),
[Pid] = emqx_cm:lookup_channels(<<"clientid">>),
exit(Pid, kill), timer:sleep(100),
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
t_discard_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
ok = emqx_cm:discard_session(<<"clientid">>),
@ -97,35 +166,26 @@ t_discard_session(_) ->
ok = meck:unload(emqx_connection).
t_takeover_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>),
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
Pid = self(),
{ok, emqx_connection, Pid, test} = emqx_cm:takeover_session(<<"clientid">>),
erlang:spawn(fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
timer:sleep(1000)
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
receive
{'$gen_call', From, {takeover, 'begin'}} ->
gen_server:reply(From, test), ok
end
end),
ct:sleep(100),
timer:sleep(100),
{ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection).
emqx_cm:unregister_channel(<<"clientid">>).
t_kick_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
test = emqx_cm:kick_session(<<"clientid">>),
erlang:spawn(fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
timer:sleep(1000)
end),
ct:sleep(100),

View File

@ -19,6 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
@ -50,7 +52,7 @@ t_detect_check(_) ->
false = emqx_flapping:detect(ClientInfo),
false = emqx_banned:check(ClientInfo),
true = emqx_flapping:detect(ClientInfo),
timer:sleep(100),
timer:sleep(50),
true = emqx_banned:check(ClientInfo),
timer:sleep(3000),
false = emqx_banned:check(ClientInfo),
@ -61,3 +63,13 @@ t_detect_check(_) ->
Pid ! test,
ok = emqx_flapping:stop().
t_expired_detecting(_) ->
ClientInfo = #{zone => external,
clientid => <<"clientid">>,
peerhost => {127,0,0,1}},
false = emqx_flapping:detect(ClientInfo),
?assertEqual(true, lists:any(fun({flapping, <<"clientid">>, _, _, _}) -> true;
(_) -> false end, ets:tab2list(emqx_flapping))),
timer:sleep(200),
?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false;
(_) -> true end, ets:tab2list(emqx_flapping))).