Compare commits
165 Commits
Author | SHA1 | Date |
---|---|---|
![]() |
0e97dfff29 | |
![]() |
d409e25e76 | |
![]() |
c1f8d7de2f | |
![]() |
76e6ded825 | |
![]() |
d730b24494 | |
![]() |
978153d993 | |
![]() |
b47a4f4422 | |
![]() |
7124a40f3d | |
![]() |
614b836983 | |
![]() |
85d81a84d0 | |
![]() |
243e6b3571 | |
![]() |
ad5b954f1d | |
![]() |
21293140d3 | |
![]() |
1150e06d18 | |
![]() |
b8cf9e14de | |
![]() |
dbea5df7dc | |
![]() |
ac93725f74 | |
![]() |
1574d50387 | |
![]() |
0a226ca4bc | |
![]() |
9c2584607a | |
![]() |
2d99a1412e | |
![]() |
6f4b8d637b | |
![]() |
0943129287 | |
![]() |
0908974017 | |
![]() |
19a8f0cbf8 | |
![]() |
b4bbfad415 | |
![]() |
68f6a43492 | |
![]() |
ac1e10bb60 | |
![]() |
31671f5ee5 | |
![]() |
ffef64a803 | |
![]() |
58ba22dfc7 | |
![]() |
0e82170ed5 | |
![]() |
2c73ab9713 | |
![]() |
997b693400 | |
![]() |
1a5d8ca3fd | |
![]() |
be2ce93a2c | |
![]() |
90ace6a331 | |
![]() |
4aff37772f | |
![]() |
0daa703193 | |
![]() |
155eb82283 | |
![]() |
3410151c24 | |
![]() |
43cf0fbab4 | |
![]() |
fa7292560a | |
![]() |
bffca305c1 | |
![]() |
a6210f7142 | |
![]() |
2fc3e560d2 | |
![]() |
6ec7cb5090 | |
![]() |
4d14d51dcb | |
![]() |
def758152d | |
![]() |
1c17d514aa | |
![]() |
b8ce42fd29 | |
![]() |
47d628f9d9 | |
![]() |
c9498d7986 | |
![]() |
19fdf3b77a | |
![]() |
1a2592b13c | |
![]() |
86feae6adc | |
![]() |
5835c06745 | |
![]() |
7c3e5d765c | |
![]() |
872c0af3fd | |
![]() |
411f8a0ec7 | |
![]() |
e76451000c | |
![]() |
65ae10a651 | |
![]() |
b9715b9e71 | |
![]() |
0625bfe3f8 | |
![]() |
6b43acc1c1 | |
![]() |
854a48d77c | |
![]() |
ea6d9f12d9 | |
![]() |
5025b2f65d | |
![]() |
97f9e7123d | |
![]() |
f0a434739d | |
![]() |
076776d7d4 | |
![]() |
44d901eb1e | |
![]() |
297a385def | |
![]() |
bd664ae370 | |
![]() |
21c18d15d4 | |
![]() |
af6ad8a90f | |
![]() |
a2a8e42880 | |
![]() |
0f1b678126 | |
![]() |
1fbc50530a | |
![]() |
0c104faef7 | |
![]() |
8662f55c4c | |
![]() |
cc43da0fd5 | |
![]() |
e6ccbc601c | |
![]() |
22084025e6 | |
![]() |
2b1a2f5e13 | |
![]() |
2164c9149b | |
![]() |
21e31ab1c8 | |
![]() |
1aa30cba89 | |
![]() |
79e37cba0d | |
![]() |
127427b783 | |
![]() |
ff0fd66725 | |
![]() |
5a4adefb16 | |
![]() |
24b4d83c12 | |
![]() |
3d45da8e03 | |
![]() |
962fb0cec5 | |
![]() |
dac1b92d8f | |
![]() |
5fb4f23504 | |
![]() |
5f53952b45 | |
![]() |
e349136cb2 | |
![]() |
28bd9a7fda | |
![]() |
c4bf5aa34c | |
![]() |
f837ac47e7 | |
![]() |
b20e87f98e | |
![]() |
ee9f278738 | |
![]() |
8751c10ea5 | |
![]() |
891ef2680e | |
![]() |
b461e26f25 | |
![]() |
d5b17c516e | |
![]() |
bb9c41c9f0 | |
![]() |
88dbbc3a44 | |
![]() |
a6f138b55c | |
![]() |
55ec358cd6 | |
![]() |
067d28dcb6 | |
![]() |
118e67a8ca | |
![]() |
ff9fccdb07 | |
![]() |
87f12256ca | |
![]() |
9c1421f4b8 | |
![]() |
e58b0fc1db | |
![]() |
cd8dacabd6 | |
![]() |
dc76d7ef93 | |
![]() |
be76a6e50a | |
![]() |
b55e76001a | |
![]() |
0adc8b39af | |
![]() |
d191ef0303 | |
![]() |
4aa50f0f6e | |
![]() |
9433e563fb | |
![]() |
aa6ae3ad8a | |
![]() |
8d6a78ce8e | |
![]() |
cad0f1a858 | |
![]() |
b7abf7ca7a | |
![]() |
3c6930b849 | |
![]() |
ed1f428ef6 | |
![]() |
d7e46b367e | |
![]() |
de319fa29a | |
![]() |
d998728847 | |
![]() |
f9dde2f049 | |
![]() |
609968dd31 | |
![]() |
0bcc692071 | |
![]() |
1e5241f401 | |
![]() |
83cfcc5d2f | |
![]() |
010e1fa9a8 | |
![]() |
b9378710d8 | |
![]() |
69665be6f8 | |
![]() |
849f0aaef7 | |
![]() |
1da9e4397e | |
![]() |
f5b77f2e7b | |
![]() |
e7e8131f2a | |
![]() |
f717b734c3 | |
![]() |
5b0c752181 | |
![]() |
92f3036f1f | |
![]() |
58e3555e9b | |
![]() |
e79c714ec4 | |
![]() |
2ee18ddebc | |
![]() |
490ac8f449 | |
![]() |
27fcb73483 | |
![]() |
5039b751f4 | |
![]() |
f13654dbce | |
![]() |
18100cacf9 | |
![]() |
ad5ece8c33 | |
![]() |
1f842e4a19 | |
![]() |
8822468528 | |
![]() |
3c0294eafe | |
![]() |
e722e0f6d1 | |
![]() |
ebbba23276 | |
![]() |
61567c0fcc |
|
@ -28,7 +28,7 @@ git clone https://github.com/emqx/emqx-rel.git
|
|||
|
||||
cd emqx-rel && make
|
||||
|
||||
cd _rel/emqx && ./bin/emqx console
|
||||
cd _build/emqx/rel/emqx && ./bin/emqx console
|
||||
|
||||
```
|
||||
|
||||
|
|
|
@ -1,17 +1,18 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICxjCCAa6gAwIBAgIJAJk1DbZBu8FDMA0GCSqGSIb3DQEBCwUAMBMxETAPBgNV
|
||||
BAMMCE15VGVzdENBMB4XDTE3MTEwMjEzNDI0N1oXDTE5MTEwMjEzNDI0N1owEzER
|
||||
MA8GA1UEAwwITXlUZXN0Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB
|
||||
AQDshDho6ef1JClDJ24peSsXdFnFO3xIB7+BSp1YPcOvmRECKUG0mLORw3hNm15m
|
||||
8eGOn1iLGE/xKlaZ74/xjyq8f7qIGZCmvZj59m+eiJCAmy8SiUJZtSVoOlOzepJd
|
||||
PoDgcBvDKA4ogZ3iJHMUNI3EdlD6nrKEJF2qe2JUrL0gv65uo2/N7XVNvE87Dk3J
|
||||
83KyCAmeu+x+moS1ILnjs2DuPEGSxZqzf7IQMbXuNWJYAOZg9t4Fg0YjTiAaWw3G
|
||||
JKAoMY4tI3JCqlvwGR4lH7kfk3WsD4ofGlFhxU4nEG0xgnJl8BcoJWD1A2RjGe1f
|
||||
qCijqPSe93l2wt8OpbyHzwc7AgMBAAGjHTAbMAwGA1UdEwQFMAMBAf8wCwYDVR0P
|
||||
BAQDAgEGMA0GCSqGSIb3DQEBCwUAA4IBAQAi+t5jBrMxFzoF76kyRd3riNDlWp0w
|
||||
NCewkohBkwBHsQfHzSnc6c504jdyzkEiD42UcI8asPsJcsYrQ+Uo6OBn049u49Wn
|
||||
zcSERVSVec1/TAPS/egFTU9QMWtPSAm8AEaQ6YYAuiwOLCcC+Cm/a3e3dWSRWt8o
|
||||
LqKX6CWTlmKWe182MhFPpZYxZQLGapti4R4mb5QusUbc6tXbkcX82GjDPTOuAw7b
|
||||
mWpzVd5xnlp7Vz+50u+YaAYUmCobg0hR/AuTrA4GDMlgzTnuZQhF6o8iVkypXOtS
|
||||
Ufz6X3tVVErVVc7UUfzSnupHj1M2h4rzlQ3oqHoAEnXcJmV4f/Pf/6FW
|
||||
MIIC0TCCAbmgAwIBAgIUDQN8HojZmyEV9+AzEz6j6juwThswDQYJKoZIhvcNAQEL
|
||||
BQAwEzERMA8GA1UEAwwITXlUZXN0Q0EwHhcNMTkxMTE1MDcyNjU4WhcNMjkxMTEy
|
||||
MDcyNjU4WjATMREwDwYDVQQDDAhNeVRlc3RDQTCCASIwDQYJKoZIhvcNAQEBBQAD
|
||||
ggEPADCCAQoCggEBALce8QYBpl7fxEhwW0wtBQygXisMcPTKzckz3RhU21TeqK1Z
|
||||
6Fm03QyYvB239oYJLodVwzv5SNI75hZ43Vyp+SHt3M3DjcsU/8PflxFK4QR7TdhI
|
||||
ddn6R59Gqt0MhAZ/df2dYt7cMaQV8/5plzxLvrv9X2fwo8BYAGp6g6wGAL8SJDT9
|
||||
jd9TGzBG/o3dLu3keEwcl0CMq3qUwxatBHMe2s7COKBrngD/CvRAL8tG3VTj7ep9
|
||||
n29SSS8qMzHhJdBahTDrYS+SeW61iFK1yLXSxCWNoMB0/g7/AktWuAXHdHRX9xaf
|
||||
WNJ4RdoPxhqkVJ8SrC4JtC8ah6DchVysWnz2KwMCAwEAAaMdMBswDAYDVR0TBAUw
|
||||
AwEB/zALBgNVHQ8EBAMCAQYwDQYJKoZIhvcNAQELBQADggEBAEgnPnHLdivykReJ
|
||||
I8xf5DeWsgBUdVvhxz2E9Ole/u6ThulNLziwHernkTprskiKFJaF67ZzS7YddTdf
|
||||
WsS0H5LhYaft5NnBcn9UHCKEycyr3AJZ6joB3Dd9CfMQEscnZHNmIXwPGxw4bYP6
|
||||
AElF0Iy7LY/Z8po/UACTBzCCSf5UkZ9Jy/rzxuvn/cfPcLNhDWk8b8MbmOfuyNPV
|
||||
SfPGn7wXIt9iyyA4qyzEVMaXl8d94E48dV5Fc1sQEEo6gk16dQ9p64ePMvUih6an
|
||||
kSz9X/n1+9sHq54pJmLZ2gfRvGPIPVIipSjAj4sjHvKzuC3CQTTXs9HzmN2nT0zx
|
||||
gLxgEkY=
|
||||
-----END CERTIFICATE-----
|
||||
|
|
|
@ -1,18 +1,18 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIIC6jCCAdKgAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
|
||||
c3RDQTAeFw0xNzExMDIxMzQyNDhaFw0xOTExMDIxMzQyNDhaMC0xGjAYBgNVBAMT
|
||||
EU1hY0Jvb2stQWlyLmxvY2FsMQ8wDQYDVQQKEwZzZXJ2ZXIwggEiMA0GCSqGSIb3
|
||||
DQEBAQUAA4IBDwAwggEKAoIBAQDUO/kL3ar3WsopPF12qAf+cwDHklGJIxJsjdoZ
|
||||
XgI1lPEe1W1QXwb/G/tyf6Fj2J8CD5bfsRjDxAemFIBVrFwlunCk+Gs6xR7vzz4O
|
||||
Fonoj4pmleruLQrNY/bHa2WN97OdISyXzhOgDwSaqobnF0n/f0Mx+9sdHO3p8LNB
|
||||
3JXUyBpwDNr/TTfAb4pbQEu3LF4p7uyd1eLhKzUxSiWzKtjB1EYObA87fZu0tBJZ
|
||||
iGujuFiI7tf4qWKeuAoRa/cXkgVZhk0utYauDoa7qBZ5O6ZdEko9ov0+i5+1JGU/
|
||||
w5wrSPNAnM2lYVUn0kJmcV2gwa4RZFjdqp+/Fx+HnKbnhZEnAgMBAAGjLzAtMAkG
|
||||
A1UdEwQCMAAwCwYDVR0PBAQDAgUgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqG
|
||||
SIb3DQEBCwUAA4IBAQByWhNxX/L5QYBiMY4JM1RRciV4uI3F2vsc0yMFDSrZza+5
|
||||
tNJQS86hjQsCRZh9VshezvT7k1yVsAC4pnu2pzob8H3KG4vYBafMdl2Ghgv3RMix
|
||||
J3NrBhcoYYhXEoZHost+htxEi7P3QBo/qDkk48/d30+aDPbms6kQd8Fj8+C5tD3b
|
||||
aznO5Qlni72uTaM7fNA8exoc/YZc83lsqv7v+UzNQR595jnYSIAZcgil1qqygOan
|
||||
Zx/RsMGUz6EYI9lPpoyyVtw13SoQshfgwvUlvBMiekSuI/pp6N7QPK6C8DLO0tVv
|
||||
gXJjDgioqHc3hcgG4cskLbfVnohiwdhQTFayrLEk
|
||||
MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
|
||||
c3RDQTAeFw0xOTExMTUwNzI2NThaFw0yMTExMTQwNzI2NThaMCoxFzAVBgNVBAMM
|
||||
DjAwMDQubm92YWxvY2FsMQ8wDQYDVQQKDAZzZXJ2ZXIwggEiMA0GCSqGSIb3DQEB
|
||||
AQUAA4IBDwAwggEKAoIBAQDC5JE48PJ/BFTLEseEbrGIdYB6w29hme4KFKmAqlLQ
|
||||
kpwwZJAsm/9iuXy6svJf7Tzzc173Jkgzw7DzhzSf1VgRDrOCQS+IU6s8UXfUMJt/
|
||||
AmP1SkU2mUJ/+pnEGRKtVkF9LCScinI95Iwt3xngdjMYXwk+S9Le3/8782ClBwZG
|
||||
vffXQ7hd5HnShgyqFVePgrKmr879NTylfvAWPwux2kdXNnbOHIrhcZm0NeMNf7hs
|
||||
UNURFlqo4rA0FV9dIHMryPkM7ygoaMog2XmcCnq/jf/MfPTQPYjQ9iLPOGrYi0pY
|
||||
X12uFb55duRGsvs7MIkNc8fn2VERoC69QX+GK+zAUGZ/AgMBAAGjLzAtMAkGA1Ud
|
||||
EwQCMAAwCwYDVR0PBAQDAgUgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqGSIb3
|
||||
DQEBCwUAA4IBAQBpW7Ge5duo6/u3xIl0XhG/2dlSwlUUpO3Ecc13gmh44nJR66VH
|
||||
BEiimsol6gIgcSTk4pVY1DLb/09Nwv0TILl3Dc4QtXhM4gIlNRR79mLVsnPTef5e
|
||||
xkmesQaLihSCroHq8bONnO/Xgj5hCg8uI4j3vHtOikjABxQPOrCfc2uSrenU7aol
|
||||
1HBijCY6R+pg6WxBOZ2Teiaoxjn78IxSKLXW0pLRJIPpet1hefR0sKkmPfVGyg8H
|
||||
g7hqo+Houw8PQf2HLZnU656vyTlgIh6ES1x7Plb0cIw/LGr4rMkXs+DFg9SLbetT
|
||||
ncT4plfucsek7ImN9Dw2w2hM2FZwB8ycZfmu
|
||||
-----END CERTIFICATE-----
|
||||
|
|
|
@ -1,18 +1,18 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIIC6jCCAdKgAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
|
||||
c3RDQTAeFw0xNzExMDIxMzQyNDhaFw0xOTExMDIxMzQyNDhaMC0xGjAYBgNVBAMT
|
||||
EU1hY0Jvb2stQWlyLmxvY2FsMQ8wDQYDVQQKEwZjbGllbnQwggEiMA0GCSqGSIb3
|
||||
DQEBAQUAA4IBDwAwggEKAoIBAQC8GptpL25hv1Qa3jCn4VLvDRH/SrHg9wXvqRkz
|
||||
HuiKMxYT30m4+kcaXv350CJrkV+8lR24wdN7DBVewpCUnyUBbzkLccy1LUzunZ3z
|
||||
nm37j6cautD3rlC9gsC9d0uJ745FLx5t/6f1jMk9rWxn+4iSGAnkWC3mVaQxP1zQ
|
||||
q8GI97uob9HNb0OH6ygHJAcKOWB+85a29LIMa1uo/lT3hMr8sBg2vX+1F/gTusmW
|
||||
xVoQc9XJxBCs995qsH0UkZIuOY0XZp9/qFfcZv2QmslG8DojIIHKcujzu8bItE2M
|
||||
OyL5NlWLvN6qg59hHzF4+D+T+8GkhhKWSC+xdY14eQ5fB4S5AgMBAAGjLzAtMAkG
|
||||
A1UdEwQCMAAwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMCMA0GCSqG
|
||||
SIb3DQEBCwUAA4IBAQBLV4ZfhiKiFVnL/xO0MRGSKr3xd0LK64SW8Iw5DYkc0jNX
|
||||
sDrRbj2I/KJ/Rc4AeKT751L+C+KBzYpFgiLrxDmt/5pmgiFH51hPQtL7kRC0z2NY
|
||||
EY/P+u4IFVSo+b1hHYU7y+OMj6/Vvd4x0ETS4rHWI4mPDfGfvClEVLOktgRKrMU5
|
||||
9aTltF4U0FBUlYZTQBNBUFwBzj1+0lxK4EdhRmmWJ+uW9rgkQxpnUdbCPGvUKFRp
|
||||
3AbdHBAU9H2zVd2VZoJu6r7LMp6agxu0rYLgmamRAt+8rnDXvy7H1ZNdjT6fTbUO
|
||||
omVBMyJAc1+10gjpHw/EUD58t5/I5tZrnrANPgIs
|
||||
MIIC5zCCAc+gAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
|
||||
c3RDQTAeFw0xOTExMTUwNzI2NThaFw0yMTExMTQwNzI2NThaMCoxFzAVBgNVBAMM
|
||||
DjAwMDQubm92YWxvY2FsMQ8wDQYDVQQKDAZjbGllbnQwggEiMA0GCSqGSIb3DQEB
|
||||
AQUAA4IBDwAwggEKAoIBAQDcwo5SaoRpzkqy+Y9OADOL7U84h1VFfjb5Uu5raenO
|
||||
elmHSaCZpVP2EsDUaWavtabHd9fa5Oq6lOyZPDZM6xttfi78EV4RRfEJ4XdvE54W
|
||||
MZSDAGz4RwxfGOQWBSFyp1NrzT32eqeDSyBrE3jhWx9UUUMwthg5YYjCdBwK+Dwf
|
||||
hsfS1YeAfXPNO/BGSTe0dPhjLztXe1BkFO5VAwkSXaPs2lBJddOgpTTLXQ3+hIPL
|
||||
ozkiaTOMOvIMXsCspdhJbSc+jAAGZT5X9Tx7htYbPXIwyDJgeYGmLtr9XxPJ8XGR
|
||||
rpxkB3zASRcwQzsxTcwkG4E32T53tKsljTkNt15rIoo3AgMBAAGjLzAtMAkGA1Ud
|
||||
EwQCMAAwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMCMA0GCSqGSIb3
|
||||
DQEBCwUAA4IBAQBRtQMvUmiB84RmrGwHCP8hcGUWTz03mtTjGrykNA7YQkA09cRl
|
||||
RwiqYMWh6zHjdX1Ri3m9eIi/QSK/JX3S9zjZU9dSTtsdnMhkRL08kcxauv9gVXCG
|
||||
G1Vf+lUVJxTqwuAmcLiDNg9/89sSlxQXFS7Jn9TwTvNiRoFoN5IiJ4LsXyr4uS9Y
|
||||
S4Ul1aqetwpTV8bjpIbRJbOR8qBFshIZOPdgAT3RqbD/vpGzOvvV0c9g3VFLYoK3
|
||||
nQ63w1zhwYxC4MQD9rN7JRAKCDQBLNzf8PW0RSG9pVsf1IjaLxtsmQMgrAati/Ux
|
||||
AG76LAn9sodtb4GtV8E9ITG0pMNlJyUovstS
|
||||
-----END CERTIFICATE-----
|
||||
|
|
|
@ -1,27 +1,27 @@
|
|||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEowIBAAKCAQEAvBqbaS9uYb9UGt4wp+FS7w0R/0qx4PcF76kZMx7oijMWE99J
|
||||
uPpHGl79+dAia5FfvJUduMHTewwVXsKQlJ8lAW85C3HMtS1M7p2d855t+4+nGrrQ
|
||||
965QvYLAvXdLie+ORS8ebf+n9YzJPa1sZ/uIkhgJ5Fgt5lWkMT9c0KvBiPe7qG/R
|
||||
zW9Dh+soByQHCjlgfvOWtvSyDGtbqP5U94TK/LAYNr1/tRf4E7rJlsVaEHPVycQQ
|
||||
rPfearB9FJGSLjmNF2aff6hX3Gb9kJrJRvA6IyCBynLo87vGyLRNjDsi+TZVi7ze
|
||||
qoOfYR8xePg/k/vBpIYSlkgvsXWNeHkOXweEuQIDAQABAoIBAHnFV7peRDzvGUlT
|
||||
cXgcvA2ZDn+QIVsbTzJ466FWbv+YVsCCmj0veHwv5oakIMQ2Fh4FAnqqr3dGuUbg
|
||||
+avc4p3tHKa2Aul+7ADE9I3TkCt8MZdyPPk6VXZ5gMCmy7X96MIM4Mwg5uBlRZmx
|
||||
/S3Lffvlp/G0y/ICmwpulG1Z4y4A5Vc0Qf7fBO03Ekl31oReARnB6ex7RnDHH1mW
|
||||
RyLWNqyu9BhUbFpIyFPWDSkBcajNIbQ6qVJfGLm5Y2xVhwdqbyvY8M06uuMKz/IR
|
||||
SYfdIpiC4PpQZQzzXMn/6LTKWcCe0T+dBcWTZHC3C2abrC7+5fwFobs2xoUaCwz0
|
||||
1CclogECgYEA6Jdv+2VSYIBLbS0VIe07JiZaQNd1QNg63MK/y7oqAKEzYvpWzJel
|
||||
owPdBU3GxZH6vUUF7sCABcjumEDazoqTtzHQBo0xYpJrjmAL0ANNGVvF09pJK2eq
|
||||
yotxJJAS5/lQNSgWOxGVc6qu6ZpgeIXVLIx816yq04h10yVgZ2Lm3+ECgYEAzwj5
|
||||
/UVpN/ak6PwZ+Tq/8qOYjY2ABylRmP+T0Rkqmwh2B5Sp9oXjkDQwWseY0Wybhd8F
|
||||
kO6BUCMUApnB3uU0baawVbDUSrt43SkkKV9m3pA35wA3pYw1a56QIEFr56npFYBS
|
||||
sn9yl/ZtNvnuwmrHWOq8HdwPJsWREyO61yknn9kCgYB1PdixpSo4AJOErePoHRfi
|
||||
rBR0eObez+Aj5Xsea3G+rYMkkkHskUhp+omPodvfPS1h+If8CEbAI7+5OX/R+uJo
|
||||
xpAwrT1Gjb3vn5R0vyU+8havKmoVmgTqYg2fO4x8KBz5HoLONZfbHR9cG3gjaHrD
|
||||
IPHRGXVmeXPDAiUtGBp+oQKBgQDDRIAkNPdMZUCczknhG1w3Cb20pKUAHCRt3YAZ
|
||||
U1cv6gcIl1rGvPko5VBGDsM/ouP8m6CwVYN5hdw1p7eG9z8/vFvMNn/EDJWuYkNN
|
||||
EkH/4J4ZLcdOSLOJ0X+2LH4Nfd/s+58D49i9IxtXItviWruyTZMnxooz01tFZgmv
|
||||
LY3F4QKBgDirafhlJqFK6sa8WesHpD5+lm3Opzi4Ua8fAGHy2oHN3WCEL74q691C
|
||||
fA0P2UrzYiF7dXf4fgK9eMMQsdWS4nKyCSqM6xE4EAhAHUTYzY3ApNjI3XFDIrKC
|
||||
oQefIOLum2UyWFuEoUtrEfc5fxktiQohCwuAvwC59EwhmsNlECA8
|
||||
MIIEowIBAAKCAQEA3MKOUmqEac5KsvmPTgAzi+1POIdVRX42+VLua2npznpZh0mg
|
||||
maVT9hLA1Glmr7Wmx3fX2uTqupTsmTw2TOsbbX4u/BFeEUXxCeF3bxOeFjGUgwBs
|
||||
+EcMXxjkFgUhcqdTa8099nqng0sgaxN44VsfVFFDMLYYOWGIwnQcCvg8H4bH0tWH
|
||||
gH1zzTvwRkk3tHT4Yy87V3tQZBTuVQMJEl2j7NpQSXXToKU0y10N/oSDy6M5Imkz
|
||||
jDryDF7ArKXYSW0nPowABmU+V/U8e4bWGz1yMMgyYHmBpi7a/V8TyfFxka6cZAd8
|
||||
wEkXMEM7MU3MJBuBN9k+d7SrJY05DbdeayKKNwIDAQABAoIBAC6ww3Mw7iKGrAvg
|
||||
dmuz5TMSFPBKx0E0aaIf5Sc4tmeiPu87Jkl4yyI/YyNJy5scG1MSyMeWJQMjXksm
|
||||
jgGEtD9bMcrETZXvqgRB+IW4q3XcNKHkZCe6tyYh2JPDsAhU1XL2bMWFuYouSIP9
|
||||
EVLwd9bYfRJ/YO4577fY4Nl9GRI9hdOB0Y4dDvxHCprxXC/wH6NpvI5dktTPr2xl
|
||||
pNqABKdG8XEzP0duIpQf5zXbfDAWRUEpB9MDBXqmKmdjdPnpNS7JtkmCtWogdA9F
|
||||
LcyFI3e86qB9HHaqq1hBsQEG/DYj0RxCcAQFqTfvpxmZOXDlfWdB7M8xnqkD5xT0
|
||||
s6K1TXECgYEA79Lx5FFxfkN/uZKQzV1slJ/GSyfJqKhRh38/8G1ncmSG5dh0QMht
|
||||
Tt7FbFhYwGZQY9iMq1g/ujlHAzdKbFHGRX0z30xP7kf0R/L0W2yHMq59Ys4nUhGY
|
||||
o1v2sGxgDDP9XPNm/MV8DCZcoLMxvvFrfWLMYcvWTJb8TBGQgqpcEF8CgYEA66Zu
|
||||
d+l2W5LSTgwYeIAQuiIhhNLY9Ct94TWrum5QZMdeR+IUYn+dT817Qbmf4KiiihfJ
|
||||
V8t3tYgBBamNpqMKpm7An+HnFgRoV3o8W0pjlKdaQ0EiwhTQsLJcmZ1JV/k9Dd4V
|
||||
Rl26M0DZRKTHIUWLt7nNYexydQpfWlfRX1/n9SkCgYAKphUzjCI79wdO2CEx3Tob
|
||||
B1UotSWRJZgpKg9Ov6zeOXR79DaFQeEIpX+ipfGa6XAcXtswKIT74dszW1skoCTr
|
||||
pPmOqrbJ38wK/dC31oPSTkkm//xi+oEKj+TORKGnKQ/Q9sXV53bwmyt1vz8wOUwK
|
||||
jz6AASsMz494WTdPdf0MhQKBgQDGoBos6JPiy/aH4podt5Rhz7MBCdfkt2P7GAoP
|
||||
sjwBNiq53E3iWD54rXJfC98+teWLEFGdttrIIEL8StYixvqLHn8uRHNLk5t/YIDP
|
||||
UfxtqEHkvlpVzMW6qhxzPqg7htF3huHX1djEqrx3p4xQ9xW1Xt9G0s4G6R9GPw8z
|
||||
nNsfQQKBgF1nvj5xhD7fiVzS7NrjtBslDxKGQCfs9f1Xl2eadGp6pgwG/hvw5oO7
|
||||
gtoYJuPq+Zu92a+UDQVErXMHiXn3iza/3EOf2BP9zbq9mBGZtKmLExP0QGEmDygb
|
||||
Yo18YdfwWwqxvEf+jt2URv0w+KNWL/3j5rDmngNa1iNubX3p1AK6
|
||||
-----END RSA PRIVATE KEY-----
|
||||
|
|
|
@ -1,27 +1,27 @@
|
|||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEogIBAAKCAQEA1Dv5C92q91rKKTxddqgH/nMAx5JRiSMSbI3aGV4CNZTxHtVt
|
||||
UF8G/xv7cn+hY9ifAg+W37EYw8QHphSAVaxcJbpwpPhrOsUe788+DhaJ6I+KZpXq
|
||||
7i0KzWP2x2tljfeznSEsl84ToA8EmqqG5xdJ/39DMfvbHRzt6fCzQdyV1MgacAza
|
||||
/003wG+KW0BLtyxeKe7sndXi4Ss1MUolsyrYwdRGDmwPO32btLQSWYhro7hYiO7X
|
||||
+KlinrgKEWv3F5IFWYZNLrWGrg6Gu6gWeTumXRJKPaL9PouftSRlP8OcK0jzQJzN
|
||||
pWFVJ9JCZnFdoMGuEWRY3aqfvxcfh5ym54WRJwIDAQABAoIBABNq2UJIqZev6scT
|
||||
CsoMXY7eHrgjnuoZF1pvMAEaJMGaOuVDSZkM2KsGeF7lZnKoIwQhQQB+R3HBwaFk
|
||||
RsmP125sPFobkFP0LPxrzZWkYkGwwEzacoAQBuj7uFxOayAuBXTe0CGjbRA7z4QH
|
||||
DgiejNqfXhp4nHdxaiL5Lq1b7SlmarGXup3kcVTWxIiah4MK0o4YGiyQC8Mr+a7w
|
||||
UGYqdKQQMLOtly/HTEcyd/DAruboV+5L+pYx/pcFFXJupK6yaxELLHKeHAKA9MmA
|
||||
cnMNVpCQ8VdOyR9qrfwtABqd8egKea2Z3P+dK6PlxUAQe2kYlXxS0N+i/eU6PKYM
|
||||
B76UhQECgYEA9GvBNG6ffQqkX6bNLQUsU+nvKAQeFq02ua9LFKYw2sVO6RfUjrNz
|
||||
u2cwAUXSp+tnPMesKEVOOUfRMN/QiI/JNw62uSWSKJ/64103vX+F5AjQmE2f7Zgt
|
||||
o3X23cV544HM8E5xCvIe7DFLK6cUdRQngu/uWi63cB4hMVpB9MfZJscCgYEA3kne
|
||||
2sE4b67JkjmHGKahBJM5/iAHBqSubQmufIlaiLkyrDYGN2D+mi0fAF+uQ9KmNOrv
|
||||
TsZ1bZu9f+VvaH7xNJzcUriXYs+HoN9/CWnAR0ktSm8RN7BznVd41NuLnsoWUt41
|
||||
jglpNYMwy7JPRLQNgYHErG7puksNawFvSKQEYqECgYA4N/iueKtSdXotTg5vRntV
|
||||
qb8KczgAe0LVHs6kJz2hdDScRJDtabU665cNE+RKH0kVn8+nS5mcbzpchX5PitL7
|
||||
SPUaTNv7YCCy3yQNACHpu2VPQruASLpmmKF5jQxmGdrrgv9ZRyt5pDToC3wXGdWk
|
||||
tk8aixhCP4ve8CWvibAWzQKBgExxxwwf6tKtn3CEDCu0EifKoeT9Cq2EMOAatkDp
|
||||
05K1bfG/Wn/tAWHwJnswbHOym6oTKV1D7tpU9uRm+NtM3JKlZzejd5xpllECy2Nn
|
||||
VNKvHb49WAR40CnKDSnWnrtq8CZreKtyHRZkGYHTvmL4MLTa9dH/Cq4gZWrpQWYP
|
||||
0dpBAoGAD4inpSm7SMN3/rgYXEU1CMRKXREbEWhondiTXZ8x8ugnnYtfhcBvCMif
|
||||
JQ8tso63hCHvKPDViTbLDyV7OuGBEPTQAyacX0FJmr7g5ERlvfmL4yjmvW7Bcclh
|
||||
yrgbJXl2pdzMt9GpogIYFW0YyOr6VPIrGf62kRNrv2E8wyXEFAI=
|
||||
MIIEpQIBAAKCAQEAwuSROPDyfwRUyxLHhG6xiHWAesNvYZnuChSpgKpS0JKcMGSQ
|
||||
LJv/Yrl8urLyX+0883Ne9yZIM8Ow84c0n9VYEQ6zgkEviFOrPFF31DCbfwJj9UpF
|
||||
NplCf/qZxBkSrVZBfSwknIpyPeSMLd8Z4HYzGF8JPkvS3t//O/NgpQcGRr3310O4
|
||||
XeR50oYMqhVXj4Kypq/O/TU8pX7wFj8LsdpHVzZ2zhyK4XGZtDXjDX+4bFDVERZa
|
||||
qOKwNBVfXSBzK8j5DO8oKGjKINl5nAp6v43/zHz00D2I0PYizzhq2ItKWF9drhW+
|
||||
eXbkRrL7OzCJDXPH59lREaAuvUF/hivswFBmfwIDAQABAoIBAQCYa9gj11Vf/0wt
|
||||
kh9WNJhGJ9d2q5hVleR0H9q9FPg1xSPAOTYEnXBrjrO89CzY1xq/L7DKzDbVvSuM
|
||||
GmcOxfTdSkkcCs0Y6o7WWsTDv8ws1frFIPPmkpBOtPhDRHS1+eq38akkgKZ+P1te
|
||||
mMiNIwQtAE6jWPuvcTIVee9QwaCn+5ZYIwICORNFoLsl7sKdLOfccSO7v9L/Az5r
|
||||
AT4xrJwpKl5MjOGzOxFv6M1rTh/Y9e17U+2/QQDnW4U7C4/gkQ1urJddaeDDnz8t
|
||||
GLAnshCdF8eL3vAKO6sMJiEGuVe3b2oBYrRjp7FSB1uLWWlFRb7TGE07UXP0JZDn
|
||||
W1lmUbcBAoGBAO4/37Obk1pM6GQzS49AwLJtz5Z9DpxMSaVW6XHQlOq6RBNQsMR0
|
||||
MS5k5TZgX0HZXAu0dGaPNzD7868dwTZE7tn6a1QanfmrqQVbJxHWTJtPEE0QGpGI
|
||||
vg2D6iiYUE0mVEiKf7dNp6hpp9ioYIdsRQK0H/u3sU/JfEFr00XpMb+BAoGBANFp
|
||||
wMIcB7RbyShO8QR/kVpahlOOnNDP7e+9KdUFl8i300ecO2QNR+hlQ+565J8nsANj
|
||||
Y2kLMls2DTzMefrEZPecb8onjGFmSkwf9uCs8vmorlYmYmNlJkLL06ZN3SrpmHBD
|
||||
GogkCt1qkrTgtszjSqZe94UcpT+mfatSK4lRlSX/AoGAXdE7Ns/Ji6KDVIm6dFOs
|
||||
TdbeCsV+DmAgFAKQdKgNLA1jJzP8F7Aleb5zYCE9AYIlM9rAh25X7msYf1m5LrSg
|
||||
Vae9weWlVZ6aNSi6ztRTYEkXAzGXNL3jERFkEM5BuM+iGtqnBjiHD9NjK/bJ5Cnn
|
||||
VvQ1L/sa0G9oBZ7/GCWG2IECgYEAqAOmENb2Y4FEyl9Txl0nXIvGvCFutaYt66wk
|
||||
dPIQzoyWKh0yFVsGd3FP6HWXGg54jK9gIfZGx6F9S2tu7oBF1dggZNwIKFkugRcg
|
||||
NzDrnNz2Ss5vH/oWkX8BZ6uPKA/VKzTbg6EPSohn/lFQuOAfk44cHyNVfdTxfNPn
|
||||
dDwNYzcCgYEA3Wig1HRNvTOnSskFz8eTmpu89hg1atuAk+c2d1Z+9HKVjkc6B/91
|
||||
pabnbujERtH3HW9TQ9+VVfImgC3Jy+TjsS3d6nA7e9060N9z30mVEY5lq03mKAMl
|
||||
tSKFk4fRRxsKPsBN/NS0BiU8LJTzsDwLwRm9T4BNos+I35a8tFCCmtw=
|
||||
-----END RSA PRIVATE KEY-----
|
||||
|
|
|
@ -181,6 +181,11 @@ node.name = emqx@127.0.0.1
|
|||
## Value: String
|
||||
node.cookie = emqxsecretcookie
|
||||
|
||||
## Node Max Clients Size.
|
||||
##
|
||||
## Value: String
|
||||
node.max_clients = 1024000
|
||||
|
||||
## Data dir for the node
|
||||
##
|
||||
## Value: Folder
|
||||
|
@ -942,7 +947,7 @@ listener.tcp.external.access.1 = allow all
|
|||
## Enable the option for X.509 certificate based authentication.
|
||||
## EMQX will use the common name of certificate as MQTT username.
|
||||
##
|
||||
## Value: cn | dn
|
||||
## Value: cn | dn | crt
|
||||
## listener.tcp.external.peer_cert_as_username = cn
|
||||
|
||||
## The TCP backlog defines the maximum length that the queue of pending
|
||||
|
@ -1293,10 +1298,10 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
|
|||
## Value: on | off
|
||||
## listener.ssl.external.honor_cipher_order = on
|
||||
|
||||
## Use the CN, EN or CRT field from the client certificate as a username.
|
||||
## Use the CN, DN or CRT field from the client certificate as a username.
|
||||
## Notice that 'verify' should be set as 'verify_peer'.
|
||||
##
|
||||
## Value: cn | en | crt
|
||||
## Value: cn | dn | crt
|
||||
## listener.ssl.external.peer_cert_as_username = cn
|
||||
|
||||
## TCP backlog for the SSL connection.
|
||||
|
|
|
@ -41,5 +41,5 @@
|
|||
|
||||
-define(LOG(Level, Format, Args),
|
||||
begin
|
||||
(logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end}))
|
||||
(logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}))
|
||||
end).
|
||||
|
|
|
@ -334,6 +334,13 @@ end}.
|
|||
hidden
|
||||
]}.
|
||||
|
||||
%% @see node.max_clients
|
||||
{mapping, "node.max_clients", "emqx.max_clients", [
|
||||
{default, 1024000},
|
||||
{datatype, integer},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% RPC
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
{gproc, "0.8.0"}, % hex
|
||||
{replayq, "0.1.1"}, %hex
|
||||
{esockd, "5.5.0"}, %hex
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.9"}}},
|
||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
|
||||
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||
]}.
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
%%-*- mode: erlang -*-
|
||||
%% .app.src.script
|
||||
|
||||
Config = case os:getenv("EMQX_DESC") of
|
||||
false -> CONFIG; % env var not defined
|
||||
[] -> CONFIG; % env var set to empty string
|
||||
Desc ->
|
||||
[begin
|
||||
AppConf0 = lists:keystore(description, 1, AppConf, {description, Desc}),
|
||||
{application, App, AppConf0}
|
||||
end || Conf = {application, App, AppConf} <- CONFIG]
|
||||
end,
|
||||
|
||||
RemoveLeadingV =
|
||||
fun(Tag) ->
|
||||
case re:run(Tag, "v\[0-9\]+\.\[0-9\]+\.*") of
|
||||
nomatch ->
|
||||
Tag;
|
||||
{match, _} ->
|
||||
%% if it is a version number prefixed by 'v' then remove the 'v'
|
||||
"v" ++ Vsn = Tag,
|
||||
Vsn
|
||||
end
|
||||
end,
|
||||
|
||||
case os:getenv("EMQX_DEPS_DEFAULT_VSN") of
|
||||
false -> Config; % env var not defined
|
||||
[] -> Config; % env var set to empty string
|
||||
Tag ->
|
||||
[begin
|
||||
AppConf0 = lists:keystore(vsn, 1, AppConf, {vsn, RemoveLeadingV(Tag)}),
|
||||
{application, App, AppConf0}
|
||||
end || Conf = {application, App, AppConf} <- Config]
|
||||
end.
|
|
@ -256,7 +256,7 @@ aggre(Routes) ->
|
|||
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
|
||||
-> emqx_types:deliver_result()).
|
||||
forward(Node, To, Delivery, async) ->
|
||||
case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||
true -> ok;
|
||||
{badrpc, Reason} ->
|
||||
?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
|
||||
|
@ -264,7 +264,7 @@ forward(Node, To, Delivery, async) ->
|
|||
end;
|
||||
|
||||
forward(Node, To, Delivery, sync) ->
|
||||
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||
case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||
{badrpc, Reason} ->
|
||||
?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
|
||||
{error, badrpc};
|
||||
|
|
|
@ -149,7 +149,14 @@ call(CPid, Req) ->
|
|||
|
||||
init({Transport, RawSocket, Options}) ->
|
||||
process_flag(trap_exit, true),
|
||||
{ok, Socket} = Transport:wait(RawSocket),
|
||||
case Transport:wait(RawSocket) of
|
||||
{ok, Socket} ->
|
||||
do_init(Transport, Socket, Options);
|
||||
{error, Reason} ->
|
||||
?LOG(warning, "connection failed to establish: ~p", [Reason])
|
||||
end.
|
||||
|
||||
do_init(Transport, Socket, Options) ->
|
||||
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
|
||||
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
|
||||
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
|
||||
|
@ -352,11 +359,11 @@ handle(info, {timeout, Timer, emit_stats},
|
|||
{keep_state, NState#state{gc_state = GcState1}, hibernate};
|
||||
{shutdown, Reason} ->
|
||||
?LOG(error, "Shutdown exceptionally due to ~p", [Reason]),
|
||||
shutdown(Reason, NState)
|
||||
self() ! {shutdown, Reason}
|
||||
end;
|
||||
|
||||
handle(info, {shutdown, discard, {ClientId, ByPid}}, State) ->
|
||||
?LOG(error, "Discarded by ~s:~p", [ClientId, ByPid]),
|
||||
?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]),
|
||||
shutdown(discard, State);
|
||||
|
||||
handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) ->
|
||||
|
|
|
@ -44,6 +44,8 @@
|
|||
|
||||
-export([lookup_conn_pid/1]).
|
||||
|
||||
-export([max_client_size/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
|
@ -148,6 +150,9 @@ lookup_conn_pid(ClientId) when is_binary(ClientId) ->
|
|||
notify(Msg) ->
|
||||
gen_server:cast(?CM, {notify, Msg}).
|
||||
|
||||
max_client_size() ->
|
||||
ets:info(?CONN_TAB, size).
|
||||
|
||||
%%-----------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%-----------------------------------------------------------------------------
|
||||
|
|
|
@ -115,7 +115,7 @@ run_fold(HookPoint, Args, Acc) ->
|
|||
|
||||
|
||||
do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
|
||||
case filter_passed(Filter, Args) andalso execute(Action, Args) of
|
||||
case filter_passed(Filter, Args) andalso safe_execute(Action, Args) of
|
||||
%% stop the hook chain and return
|
||||
stop -> ok;
|
||||
%% continue the hook chain, in following cases:
|
||||
|
@ -128,7 +128,7 @@ do_run([], _Args) ->
|
|||
|
||||
do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
|
||||
Args1 = Args ++ [Acc],
|
||||
case filter_passed(Filter, Args1) andalso execute(Action, Args1) of
|
||||
case filter_passed(Filter, Args1) andalso safe_execute(Action, Args1) of
|
||||
%% stop the hook chain
|
||||
stop -> Acc;
|
||||
%% stop the hook chain with NewAcc
|
||||
|
@ -148,6 +148,15 @@ filter_passed(undefined, _Args) -> true;
|
|||
filter_passed(Filter, Args) ->
|
||||
execute(Filter, Args).
|
||||
|
||||
safe_execute(Fun, Args) ->
|
||||
try execute(Fun, Args) of
|
||||
Result -> Result
|
||||
catch
|
||||
_:Reason:Stacktrace ->
|
||||
?LOG(error, "Failed to execute ~p(~p): ~p", [Fun, Args, {Reason, Stacktrace}]),
|
||||
ok
|
||||
end.
|
||||
|
||||
%% @doc execute a function.
|
||||
execute(Fun, Args) when is_function(Fun) ->
|
||||
erlang:apply(Fun, Args);
|
||||
|
|
|
@ -30,8 +30,7 @@ init([]) ->
|
|||
child_spec(emqx_stats, worker),
|
||||
child_spec(emqx_metrics, worker),
|
||||
child_spec(emqx_ctl, worker),
|
||||
child_spec(emqx_zone, worker),
|
||||
child_spec(emqx_tracer, worker)]}}.
|
||||
child_spec(emqx_zone, worker)]}}.
|
||||
|
||||
child_spec(M, worker) ->
|
||||
#{id => M,
|
||||
|
|
|
@ -75,7 +75,7 @@ format(#{level:=Level,msg:=Msg0,meta:=Meta},Config0)
|
|||
end,
|
||||
Config#{chars_limit=>Size}
|
||||
end,
|
||||
string:trim(format_msg(Msg0,Meta,Config1));
|
||||
format_msg(Msg0,Meta,Config1);
|
||||
true ->
|
||||
""
|
||||
end,
|
||||
|
@ -134,7 +134,7 @@ to_string(X,_) when is_list(X) ->
|
|||
_ -> io_lib:format(?FormatP,[X])
|
||||
end;
|
||||
to_string(X,_) ->
|
||||
io_lib:format("~s",[X]).
|
||||
io_lib:format(?FormatP,[X]).
|
||||
|
||||
printable_list([]) ->
|
||||
false;
|
||||
|
@ -196,7 +196,7 @@ do_format_msg({Format0,Args},Depth,Opts) ->
|
|||
%% already been here - avoid failing cyclically
|
||||
erlang:raise(C,R,S);
|
||||
_ ->
|
||||
format_msg({FormatError,[Format0,Args]},Depth,Opts)
|
||||
do_format_msg({FormatError,[Format0,Args]},Depth,Opts)
|
||||
end
|
||||
end.
|
||||
|
||||
|
|
|
@ -294,12 +294,12 @@ do_inc_recv(?PACKET(?DISCONNECT)) ->
|
|||
do_inc_recv(?PACKET(?AUTH)) ->
|
||||
inc('packets.auth.received');
|
||||
do_inc_recv(_Packet) ->
|
||||
ignore.
|
||||
ok.
|
||||
|
||||
%% @doc Inc packets sent. Will not count $SYS PUBLISH.
|
||||
-spec(inc_sent(emqx_mqtt_types:packet()) -> ok | ignore).
|
||||
-spec(inc_sent(emqx_mqtt_types:packet()) -> ok).
|
||||
inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
|
||||
ignore;
|
||||
ok;
|
||||
inc_sent(Packet) ->
|
||||
inc('packets.sent'),
|
||||
do_inc_sent(Packet).
|
||||
|
@ -341,7 +341,7 @@ do_inc_sent(?PACKET(?DISCONNECT)) ->
|
|||
do_inc_sent(?PACKET(?AUTH)) ->
|
||||
inc('packets.auth.sent');
|
||||
do_inc_sent(_Packet) ->
|
||||
ignore.
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
|
@ -356,7 +356,7 @@ init([]) ->
|
|||
% Store reserved indices
|
||||
lists:foreach(fun({Type, Name}) ->
|
||||
Idx = reserved_idx(Name),
|
||||
Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)},
|
||||
Metric = #metric{name = Name, type = Type, idx = Idx},
|
||||
true = ets:insert(?TAB, Metric),
|
||||
ok = counters:put(CRef, Idx, 0)
|
||||
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS),
|
||||
|
|
|
@ -15,9 +15,7 @@
|
|||
-module(emqx_mountpoint).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-logger_header("[Mountpoint]").
|
||||
-include("types.hrl").
|
||||
|
||||
-export([ mount/2
|
||||
, unmount/2
|
||||
|
@ -32,30 +30,34 @@
|
|||
%%------------------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
mount(undefined, Any) ->
|
||||
Any;
|
||||
mount(MountPoint, Topic) when is_binary(Topic) ->
|
||||
prefix(MountPoint, Topic);
|
||||
mount(MountPoint, Msg = #message{topic = Topic}) ->
|
||||
Msg#message{topic = <<MountPoint/binary, Topic/binary>>};
|
||||
|
||||
Msg#message{topic = prefix(MountPoint, Topic)};
|
||||
mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
|
||||
[{<<MountPoint/binary, Topic/binary>>, SubOpts} || {Topic, SubOpts} <- TopicFilters].
|
||||
[{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].
|
||||
|
||||
unmount(undefined, Msg) ->
|
||||
Msg;
|
||||
unmount(undefined, Any) ->
|
||||
Any;
|
||||
unmount(MountPoint, Topic) when is_binary(Topic) ->
|
||||
case string:prefix(Topic, MountPoint) of
|
||||
nomatch -> Topic;
|
||||
Topic1 -> Topic1
|
||||
end;
|
||||
unmount(MountPoint, Msg = #message{topic = Topic}) ->
|
||||
try split_binary(Topic, byte_size(MountPoint)) of
|
||||
{MountPoint, Topic1} -> Msg#message{topic = Topic1}
|
||||
catch
|
||||
_Error:Reason ->
|
||||
?LOG(error, "Unmount error : ~p", [Reason]),
|
||||
Msg
|
||||
case string:prefix(Topic, MountPoint) of
|
||||
nomatch -> Msg;
|
||||
Topic1 -> Msg#message{topic = Topic1}
|
||||
end.
|
||||
|
||||
-spec(replvar(maybe(mountpoint()), map()) -> maybe(mountpoint())).
|
||||
replvar(undefined, _Vars) ->
|
||||
undefined;
|
||||
replvar(MountPoint, #{client_id := ClientId, username := Username}) ->
|
||||
lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
|
||||
lists:foldl(fun feed_var/2, MountPoint,
|
||||
[{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
|
||||
|
||||
feed_var({<<"%c">>, ClientId}, MountPoint) ->
|
||||
emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
|
||||
|
@ -64,3 +66,5 @@ feed_var({<<"%u">>, undefined}, MountPoint) ->
|
|||
feed_var({<<"%u">>, Username}, MountPoint) ->
|
||||
emqx_topic:feed_var(<<"%u">>, Username, MountPoint).
|
||||
|
||||
prefix(MountPoint, Topic) ->
|
||||
<<MountPoint/binary, Topic/binary>>.
|
|
@ -126,31 +126,13 @@ default_caps() ->
|
|||
?DEFAULT_CAPS.
|
||||
|
||||
get_caps(Zone, publish) ->
|
||||
with_env(Zone, '$mqtt_pub_caps',
|
||||
fun() ->
|
||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone))
|
||||
end);
|
||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone));
|
||||
|
||||
get_caps(Zone, subscribe) ->
|
||||
with_env(Zone, '$mqtt_sub_caps',
|
||||
fun() ->
|
||||
filter_caps(?SUBCAP_KEYS, get_caps(Zone))
|
||||
end).
|
||||
filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
|
||||
|
||||
get_caps(Zone) ->
|
||||
with_env(Zone, '$mqtt_caps',
|
||||
fun() ->
|
||||
maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)}
|
||||
|| {Cap, Def} <- ?DEFAULT_CAPS])
|
||||
end).
|
||||
maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} || {Cap, Def} <- ?DEFAULT_CAPS]).
|
||||
|
||||
filter_caps(Keys, Caps) ->
|
||||
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
|
||||
|
||||
with_env(Zone, Key, InitFun) ->
|
||||
case emqx_zone:get_env(Zone, Key) of
|
||||
undefined -> Caps = InitFun(),
|
||||
ok = emqx_zone:set_env(Zone, Key, Caps),
|
||||
Caps;
|
||||
ZoneCaps -> ZoneCaps
|
||||
end.
|
||||
|
|
|
@ -166,4 +166,7 @@ call(Req) ->
|
|||
gen_server:call(?OS_MON, Req, infinity).
|
||||
|
||||
ensure_check_timer(State = #{cpu_check_interval := Interval}) ->
|
||||
State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}.
|
||||
case erlang:system_info(system_architecture) of
|
||||
"x86_64-pc-linux-musl" -> State;
|
||||
_ -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}
|
||||
end.
|
|
@ -162,10 +162,11 @@ will_msg(#mqtt_packet_connect{client_id = ClientId,
|
|||
will_qos = QoS,
|
||||
will_topic = Topic,
|
||||
will_props = Properties,
|
||||
will_payload = Payload}) ->
|
||||
will_payload = Payload,
|
||||
proto_ver = ProtoVer}) ->
|
||||
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
|
||||
Msg#message{flags = #{dup => false, retain => Retain},
|
||||
headers = merge_props(#{username => Username}, Properties)}.
|
||||
headers = merge_props(#{username => Username, proto_ver => ProtoVer}, Properties)}.
|
||||
|
||||
merge_props(Headers, undefined) ->
|
||||
Headers;
|
||||
|
|
|
@ -260,10 +260,22 @@ set_protover(_Packet, PState) ->
|
|||
received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
|
||||
{error, proto_not_connected, PState};
|
||||
|
||||
received(Packet = ?PACKET(?CONNECT), PState = #pstate{connected = false}) ->
|
||||
case check_max_clients() of
|
||||
true ->
|
||||
?LOG(error, "Connection rejected due to max clients limitation"),
|
||||
connack({?RC_QUOTA_EXCEEDED, PState#pstate{credentials = credentials(PState)}});
|
||||
false ->
|
||||
do_received(Packet, PState)
|
||||
end;
|
||||
|
||||
received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
|
||||
{error, proto_unexpected_connect, PState};
|
||||
|
||||
received(Packet = ?PACKET(Type), PState) ->
|
||||
received(Packet, PState) ->
|
||||
do_received(Packet, PState).
|
||||
|
||||
do_received(Packet = ?PACKET(Type), PState) ->
|
||||
trace(recv, Packet),
|
||||
PState1 = set_protover(Packet, PState),
|
||||
try emqx_packet:validate(Packet) of
|
||||
|
@ -562,10 +574,10 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Creden
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
|
||||
PState = #pstate{session = SPid, credentials = Credentials}) ->
|
||||
PState = #pstate{session = SPid, credentials = Credentials, proto_ver = ProtoVer}) ->
|
||||
Msg = emqx_mountpoint:mount(mountpoint(Credentials),
|
||||
emqx_packet:to_message(Credentials, Packet)),
|
||||
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState).
|
||||
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, emqx_message:set_header(proto_ver, ProtoVer, Msg))), PState).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Puback -> Client
|
||||
|
@ -834,8 +846,8 @@ check_will_retain(#mqtt_packet_connect{will_retain = false, proto_ver = ?MQTT_PR
|
|||
ok;
|
||||
check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #pstate{zone = Zone}) ->
|
||||
case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
|
||||
true -> {error, ?RC_RETAIN_NOT_SUPPORTED};
|
||||
false -> ok
|
||||
true -> ok;
|
||||
false -> {error, ?RC_RETAIN_NOT_SUPPORTED}
|
||||
end;
|
||||
check_will_retain(_Packet, _PState) ->
|
||||
ok.
|
||||
|
@ -1027,7 +1039,7 @@ raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridg
|
|||
end.
|
||||
|
||||
mountpoint(Credentials) ->
|
||||
maps:get(mountpoint, Credentials, undefined).
|
||||
emqx_mountpoint:replvar(maps:get(mountpoint, Credentials, undefined), Credentials).
|
||||
|
||||
do_check_banned(_EnableBan = true, Credentials) ->
|
||||
case emqx_banned:check(Credentials) of
|
||||
|
@ -1048,3 +1060,8 @@ do_acl_check(Action, Credentials, Topic, AllowTerm, DenyTerm) ->
|
|||
allow -> AllowTerm;
|
||||
deny -> DenyTerm
|
||||
end.
|
||||
|
||||
check_max_clients() ->
|
||||
CurrentClientSize = emqx_cm:max_client_size(),
|
||||
MaxClients = emqx_config:get_env(max_clients, 1024000),
|
||||
CurrentClientSize >= MaxClients.
|
|
@ -16,24 +16,41 @@
|
|||
-module(emqx_rpc).
|
||||
|
||||
-export([ call/4
|
||||
, call/5
|
||||
, cast/4
|
||||
, cast/5
|
||||
, multicall/4
|
||||
, multicall/5
|
||||
]).
|
||||
|
||||
-define(RPC, gen_rpc).
|
||||
|
||||
-define(DefaultClientNum, 1).
|
||||
|
||||
call(Node, Mod, Fun, Args) ->
|
||||
filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).
|
||||
|
||||
call(Key, Node, Mod, Fun, Args) ->
|
||||
filter_result(?RPC:call(rpc_node({Key, Node}), Mod, Fun, Args)).
|
||||
|
||||
multicall(Nodes, Mod, Fun, Args) ->
|
||||
filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
|
||||
|
||||
multicall(Key, Nodes, Mod, Fun, Args) ->
|
||||
filter_result(?RPC:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)).
|
||||
|
||||
cast(Node, Mod, Fun, Args) ->
|
||||
filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
|
||||
|
||||
rpc_node(Node) ->
|
||||
{ok, ClientNum} = application:get_env(gen_rpc, tcp_client_num),
|
||||
{Node, rand:uniform(ClientNum)}.
|
||||
cast(Key, Node, Mod, Fun, Args) ->
|
||||
filter_result(?RPC:cast(rpc_node({Key, Node}), Mod, Fun, Args)).
|
||||
|
||||
rpc_node(Node) when is_atom(Node) ->
|
||||
ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
|
||||
{Node, rand:uniform(ClientNum)};
|
||||
rpc_node({Key, Node}) when is_atom(Node) ->
|
||||
ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
|
||||
{Node, erlang:phash2(Key, ClientNum) + 1}.
|
||||
|
||||
rpc_nodes(Nodes) ->
|
||||
rpc_nodes(Nodes, []).
|
||||
|
@ -43,7 +60,6 @@ rpc_nodes([], Acc) ->
|
|||
rpc_nodes([Node | Nodes], Acc) ->
|
||||
rpc_nodes(Nodes, [rpc_node(Node) | Acc]).
|
||||
|
||||
|
||||
filter_result({Error, Reason})
|
||||
when Error =:= badrpc; Error =:= badtcp ->
|
||||
{badrpc, Reason};
|
||||
|
|
|
@ -401,13 +401,22 @@ handle_call(stats, _From, State) ->
|
|||
reply(stats(State), State);
|
||||
|
||||
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
|
||||
?LOG(warning, "Discarded by ~p", [ByPid]),
|
||||
?LOG(notice, "Discarded by ~p", [ByPid]),
|
||||
{stop, {shutdown, discarded}, ok, State};
|
||||
|
||||
handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
|
||||
?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid]),
|
||||
handle_call({discard, ByPid}, _From, State = #state{conn_pid = ConnPid, client_id = ClientId}) ->
|
||||
?LOG(notice, "Conn ~p is discarded by ~p", [ConnPid, ByPid]),
|
||||
case ClientId of
|
||||
<<"d:", _Sn/binary>> ->
|
||||
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
|
||||
{stop, {shutdown, discarded}, ok, State};
|
||||
_ ->
|
||||
Topic = <<"$SYS/kickout">>,
|
||||
Msg = emqx_message:make(broker, 1, Topic, <<"The client has been kicked out">>),
|
||||
{_, State1} = handle_dispatch([{Topic, Msg}], State),
|
||||
erlang:send_after(5000, self(), {kicked, ByPid}),
|
||||
{reply, ok, State1}
|
||||
end;
|
||||
|
||||
%% PUBLISH: This is only to register packetId to session state.
|
||||
%% The actual message dispatching should be done by the caller (e.g. connection) process.
|
||||
|
@ -463,12 +472,17 @@ handle_call(Req, _From, State) ->
|
|||
|
||||
%% SUBSCRIBE:
|
||||
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
|
||||
State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
|
||||
State = #state{zone = Zone, client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
|
||||
MaxSub = get_env(Zone, max_subscriptions, 0),
|
||||
{ReasonCodes, Subscriptions1} =
|
||||
lists:foldr(
|
||||
fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when
|
||||
RC == ?QOS_0; RC == ?QOS_1; RC == ?QOS_2 ->
|
||||
{[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)};
|
||||
fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when ?IS_QOS(RC) ->
|
||||
case exceeded_subscription_quota(MaxSub, SubMap) of
|
||||
true ->
|
||||
{[?RC_QUOTA_EXCEEDED|RcAcc], SubMap};
|
||||
false ->
|
||||
{[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)}
|
||||
end;
|
||||
({_Topic, #{rc := RC}}, {RcAcc, SubMap}) ->
|
||||
{[RC|RcAcc], SubMap}
|
||||
end, {[], Subscriptions}, TopicFilters),
|
||||
|
@ -493,16 +507,23 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
|||
noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1}));
|
||||
|
||||
%% PUBACK:
|
||||
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
||||
noreply(
|
||||
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight,
|
||||
client_id = ClientId,
|
||||
conn_pid = ConnPid}) ->
|
||||
case emqx_inflight:contain(PacketId, Inflight) of
|
||||
true ->
|
||||
ensure_stats_timer(dequeue(acked(puback, PacketId, State)));
|
||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||
{value, {publish, {_, #message{topic = <<"$SYS/kickout">>}}, _Ts}} ->
|
||||
ConnPid ! {shutdown, discard, {ClientId, undefined}},
|
||||
{stop, {shutdown, discarded}, State};
|
||||
_ ->
|
||||
{noreply, ensure_stats_timer(dequeue(acked(puback, PacketId, State)))}
|
||||
end;
|
||||
false ->
|
||||
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
|
||||
ok = emqx_metrics:inc('packets.puback.missed'),
|
||||
State
|
||||
end);
|
||||
{noreply, State}
|
||||
end;
|
||||
|
||||
%% PUBCOMP:
|
||||
handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
||||
|
@ -603,7 +624,7 @@ handle_info({timeout, Timer, emit_stats},
|
|||
{noreply, NewState#state{gc_state = GcState1}, hibernate};
|
||||
{shutdown, Reason} ->
|
||||
?LOG(warning, "Shutdown exceptionally due to ~p", [Reason]),
|
||||
shutdown(Reason, NewState)
|
||||
self() ! {shutdown, Reason}
|
||||
end;
|
||||
|
||||
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
|
||||
|
@ -646,6 +667,13 @@ handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
|||
[ConnPid, Pid, Reason]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({shutdown, Reason}, State) ->
|
||||
shutdown(Reason, State);
|
||||
|
||||
handle_info({kicked, ByPid}, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
|
||||
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
|
||||
{stop, {shutdown, discarded}, State};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
@ -877,12 +905,14 @@ process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #sta
|
|||
true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State);
|
||||
false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State)
|
||||
end;
|
||||
process_subopts([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) ->
|
||||
process_subopts(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, State);
|
||||
process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags}, State = #state{}) ->
|
||||
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, State);
|
||||
process_subopts([{rap, _}|Opts], Msg, State) ->
|
||||
process_subopts(Opts, Msg, State);
|
||||
process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags, headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) ->
|
||||
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
|
||||
process_subopts([{rap, _}|Opts], Msg = #message{headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) ->
|
||||
process_subopts(Opts, Msg, Session);
|
||||
process_subopts([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session = #session{}) ->
|
||||
process_subopts(Opts, Msg, Session);
|
||||
process_subopts([{rap, _}|Opts], Msg = #message{flags = Flags}, Session) ->
|
||||
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
|
||||
process_subopts([{subid, SubId}|Opts], Msg, State) ->
|
||||
process_subopts(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
|
||||
|
||||
|
@ -1020,9 +1050,19 @@ drain_q(Cnt, Msgs, Q) ->
|
|||
case emqx_mqueue:out(Q) of
|
||||
{empty, _Q} -> {Msgs, Q};
|
||||
{{value, Msg}, Q1} ->
|
||||
drain_q(Cnt-1, [Msg|Msgs], Q1)
|
||||
case emqx_message:is_expired(Msg) of
|
||||
true ->
|
||||
ok = emqx_metrics:inc('messages.expired'),
|
||||
drain_q(Cnt, Msgs, Q1);
|
||||
false ->
|
||||
drain_q(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
|
||||
end
|
||||
end.
|
||||
|
||||
-compile({inline, [acc_cnt/2]}).
|
||||
acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt;
|
||||
acc_cnt(_Msg, Cnt) -> Cnt - 1.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Ensure timers
|
||||
|
||||
|
@ -1130,3 +1170,9 @@ do_subscribe(ClientId, Username, Topic, SubOpts, SubMap) ->
|
|||
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
|
||||
maps:put(Topic, SubOpts, SubMap)
|
||||
end.
|
||||
|
||||
exceeded_subscription_quota(0, _SubMap) ->
|
||||
false;
|
||||
exceeded_subscription_quota(Max, SubMap) ->
|
||||
maps:size(SubMap) >= Max.
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
|
|||
catch
|
||||
_:Error:_Stk ->
|
||||
unregister_session(ClientId, SessPid),
|
||||
?LOG(warning, "Failed to discard ~p: ~p", [SessPid, Error])
|
||||
?LOG(notice, "Failed to discard ~p: ~p", [SessPid, Error])
|
||||
end
|
||||
end, lookup_session_pids(ClientId)).
|
||||
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
|
||||
-module(emqx_topic).
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
%% APIs
|
||||
-export([ match/2
|
||||
, validate/1
|
||||
|
@ -33,19 +31,23 @@
|
|||
, parse/2
|
||||
]).
|
||||
|
||||
-export_type([ group/0
|
||||
, topic/0
|
||||
, word/0
|
||||
, triple/0
|
||||
]).
|
||||
|
||||
-type(group() :: binary()).
|
||||
-type(topic() :: binary()).
|
||||
-type(word() :: '' | '+' | '#' | binary()).
|
||||
-type(words() :: list(word())).
|
||||
-opaque(triple() :: {root | binary(), word(), binary()}).
|
||||
|
||||
-export_type([group/0, topic/0, word/0, triple/0]).
|
||||
|
||||
-define(MAX_TOPIC_LEN, 4096).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Is wildcard topic?
|
||||
-spec(wildcard(topic() | words()) -> true | false).
|
||||
|
@ -60,7 +62,7 @@ wildcard(['+'|_]) ->
|
|||
wildcard([_H|T]) ->
|
||||
wildcard(T).
|
||||
|
||||
%% @doc Match Topic name with filter
|
||||
%% @doc Match Topic name with filter.
|
||||
-spec(match(Name, Filter) -> boolean() when
|
||||
Name :: topic() | words(),
|
||||
Filter :: topic() | words()).
|
||||
|
@ -68,7 +70,7 @@ match(<<$$, _/binary>>, <<$+, _/binary>>) ->
|
|||
false;
|
||||
match(<<$$, _/binary>>, <<$#, _/binary>>) ->
|
||||
false;
|
||||
match(Name, Filter) when is_binary(Name) and is_binary(Filter) ->
|
||||
match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
|
||||
match(words(Name), words(Filter));
|
||||
match([], []) ->
|
||||
true;
|
||||
|
@ -95,13 +97,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter ->
|
|||
-spec(validate(name | filter, topic()) -> true).
|
||||
validate(_, <<>>) ->
|
||||
error(empty_topic);
|
||||
validate(_, Topic) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) ->
|
||||
validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
|
||||
error(topic_too_long);
|
||||
validate(filter, Topic) when is_binary(Topic) ->
|
||||
validate2(words(Topic));
|
||||
validate(name, Topic) when is_binary(Topic) ->
|
||||
Words = words(Topic),
|
||||
validate2(Words) and (not wildcard(Words)).
|
||||
validate2(Words)
|
||||
andalso (not wildcard(Words))
|
||||
orelse error(topic_name_error).
|
||||
|
||||
validate2([]) ->
|
||||
true;
|
||||
|
@ -123,7 +127,7 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
|
|||
validate3(<<_/utf8, Rest/binary>>) ->
|
||||
validate3(Rest).
|
||||
|
||||
%% @doc Topic to triples
|
||||
%% @doc Topic to triples.
|
||||
-spec(triples(topic()) -> list(triple())).
|
||||
triples(Topic) when is_binary(Topic) ->
|
||||
triples(words(Topic), root, []).
|
||||
|
@ -206,27 +210,29 @@ join(Words) ->
|
|||
end, {true, <<>>}, [bin(W) || W <- Words]),
|
||||
Bin.
|
||||
|
||||
-spec(parse(topic()) -> {topic(), #{}}).
|
||||
parse(Topic) when is_binary(Topic) ->
|
||||
parse(Topic, #{}).
|
||||
-spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}).
|
||||
parse(TopicFilter) when is_binary(TopicFilter) ->
|
||||
parse(TopicFilter, #{});
|
||||
parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
|
||||
parse(TopicFilter, Options).
|
||||
|
||||
parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) ->
|
||||
error({invalid_topic, Topic});
|
||||
parse(Topic = <<?SHARE, "/", _/binary>>, #{share := _Group}) ->
|
||||
error({invalid_topic, Topic});
|
||||
parse(<<"$queue/", Topic1/binary>>, Options) ->
|
||||
parse(Topic1, maps:put(share, <<"$queue">>, Options));
|
||||
parse(Topic = <<?SHARE, "/", Topic1/binary>>, Options) ->
|
||||
case binary:split(Topic1, <<"/">>) of
|
||||
[<<>>] -> error({invalid_topic, Topic});
|
||||
[_] -> error({invalid_topic, Topic});
|
||||
[Group, Topic2] ->
|
||||
case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of
|
||||
nomatch -> {Topic2, maps:put(share, Group, Options)};
|
||||
_ -> error({invalid_topic, Topic})
|
||||
-spec(parse(topic(), map()) -> {topic(), map()}).
|
||||
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
|
||||
error({invalid_topic_filter, TopicFilter});
|
||||
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
|
||||
error({invalid_topic_filter, TopicFilter});
|
||||
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
|
||||
parse(TopicFilter, Options#{share => <<"$queue">>});
|
||||
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
|
||||
case binary:split(Rest, <<"/">>) of
|
||||
[_Any] -> error({invalid_topic_filter, TopicFilter});
|
||||
[ShareName, Filter] ->
|
||||
case binary:match(ShareName, [<<"+">>, <<"#">>]) of
|
||||
nomatch -> parse(Filter, Options#{share => ShareName});
|
||||
_ -> error({invalid_topic_filter, TopicFilter})
|
||||
end
|
||||
end;
|
||||
parse(Topic, Options = #{qos := QoS}) ->
|
||||
{Topic, Options#{rc => QoS}};
|
||||
parse(Topic, Options) ->
|
||||
{Topic, Options}.
|
||||
parse(TopicFilter, Options = #{qos := QoS}) ->
|
||||
{TopicFilter, Options#{rc => QoS}};
|
||||
parse(TopicFilter, Options) ->
|
||||
{TopicFilter, Options}.
|
|
@ -14,34 +14,19 @@
|
|||
|
||||
-module(emqx_tracer).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-logger_header("[Tracer]").
|
||||
|
||||
%% APIs
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([ trace/2
|
||||
, start_trace/3
|
||||
, lookup_traces/0
|
||||
, stop_trace/1
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
, code_change/3
|
||||
]).
|
||||
|
||||
-record(state, {traces}).
|
||||
|
||||
-type(trace_who() :: {client_id | topic, binary()}).
|
||||
-type(trace_who() :: {client_id | topic, binary() | list()}).
|
||||
|
||||
-define(TRACER, ?MODULE).
|
||||
-define(FORMAT, {emqx_logger_formatter,
|
||||
|
@ -55,65 +40,62 @@
|
|||
[peername," "],
|
||||
[]}]},
|
||||
msg,"\n"]}}).
|
||||
-define(TOPIC_TRACE_ID(T), "trace_topic_"++T).
|
||||
-define(CLIENT_TRACE_ID(C), "trace_clientid_"++C).
|
||||
-define(TOPIC_TRACE(T), {topic,T}).
|
||||
-define(CLIENT_TRACE(C), {client_id,C}).
|
||||
|
||||
-define(is_log_level(L),
|
||||
L =:= emergency orelse
|
||||
L =:= alert orelse
|
||||
L =:= critical orelse
|
||||
L =:= error orelse
|
||||
L =:= warning orelse
|
||||
L =:= notice orelse
|
||||
L =:= info orelse
|
||||
L =:= debug).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?TRACER}, ?MODULE, [], []).
|
||||
|
||||
trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
|
||||
%% Dont' trace '$SYS' publish
|
||||
%% Do not trace '$SYS' publish
|
||||
ignore;
|
||||
trace(publish, #message{from = From, topic = Topic, payload = Payload})
|
||||
when is_binary(From); is_atom(From) ->
|
||||
emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Start/Stop trace
|
||||
%%------------------------------------------------------------------------------
|
||||
emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~p", [Topic, Payload]).
|
||||
|
||||
%% @doc Start to trace client_id or topic.
|
||||
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
|
||||
start_trace({client_id, ClientId}, Level, LogFile) ->
|
||||
do_start_trace({client_id, ClientId}, Level, LogFile);
|
||||
start_trace({topic, Topic}, Level, LogFile) ->
|
||||
do_start_trace({topic, Topic}, Level, LogFile).
|
||||
|
||||
do_start_trace(Who, Level, LogFile) ->
|
||||
start_trace(Who, all, LogFile) ->
|
||||
start_trace(Who, debug, LogFile);
|
||||
start_trace(Who, Level, LogFile) ->
|
||||
case ?is_log_level(Level) of
|
||||
true ->
|
||||
#{level := PrimaryLevel} = logger:get_primary_config(),
|
||||
try logger:compare_levels(log_level(Level), PrimaryLevel) of
|
||||
try logger:compare_levels(Level, PrimaryLevel) of
|
||||
lt ->
|
||||
{error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
|
||||
_GtOrEq ->
|
||||
gen_server:call(?MODULE, {start_trace, Who, Level, LogFile}, 5000)
|
||||
install_trace_handler(Who, Level, LogFile)
|
||||
catch
|
||||
_:Error ->
|
||||
{error, Error}
|
||||
end;
|
||||
false -> {error, {invalid_log_level, Level}}
|
||||
end.
|
||||
|
||||
%% @doc Stop tracing client_id or topic.
|
||||
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
|
||||
stop_trace({client_id, ClientId}) ->
|
||||
gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}});
|
||||
stop_trace({topic, Topic}) ->
|
||||
gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
|
||||
stop_trace(Who) ->
|
||||
uninstall_trance_handler(Who).
|
||||
|
||||
%% @doc Lookup all traces
|
||||
-spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
|
||||
lookup_traces() ->
|
||||
gen_server:call(?TRACER, lookup_traces).
|
||||
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers()).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
{ok, #state{traces = #{}}}.
|
||||
|
||||
handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = Traces}) ->
|
||||
install_trace_handler(Who, Level, LogFile) ->
|
||||
case logger:add_handler(handler_id(Who), logger_disk_log_h,
|
||||
#{level => Level,
|
||||
formatter => ?FORMAT,
|
||||
|
@ -121,54 +103,37 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T
|
|||
config => #{type => halt, file => LogFile},
|
||||
filter_default => stop,
|
||||
filters => [{meta_key_filter,
|
||||
{fun filter_by_meta_key/2, Who} }]}) of
|
||||
{fun filter_by_meta_key/2, Who}}]})
|
||||
of
|
||||
ok ->
|
||||
?LOG(info, "Start trace for ~p", [Who]),
|
||||
{reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}};
|
||||
?LOG(info, "Start trace for ~p", [Who]);
|
||||
{error, Reason} ->
|
||||
?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
|
||||
{reply, {error, Reason}, State}
|
||||
end;
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
|
||||
case maps:find(Who, Traces) of
|
||||
{ok, _LogFile} ->
|
||||
uninstall_trance_handler(Who) ->
|
||||
case logger:remove_handler(handler_id(Who)) of
|
||||
ok ->
|
||||
?LOG(info, "Stop trace for ~p", [Who]);
|
||||
{error, Reason} ->
|
||||
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason])
|
||||
end,
|
||||
{reply, ok, State#state{traces = maps:remove(Who, Traces)}};
|
||||
error ->
|
||||
{reply, {error, not_found}, State}
|
||||
end;
|
||||
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
handle_call(lookup_traces, _From, State = #state{traces = Traces}) ->
|
||||
{reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State};
|
||||
filter_traces({Id, Level, Dst}, Acc) ->
|
||||
case atom_to_list(Id) of
|
||||
?TOPIC_TRACE_ID(T)->
|
||||
[{?TOPIC_TRACE(T), {Level,Dst}} | Acc];
|
||||
?CLIENT_TRACE_ID(C) ->
|
||||
[{?CLIENT_TRACE(C), {Level,Dst}} | Acc];
|
||||
_ -> Acc
|
||||
end.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
handler_id({topic, Topic}) ->
|
||||
list_to_atom("topic_" ++ binary_to_list(Topic));
|
||||
handler_id({client_id, ClientId}) ->
|
||||
list_to_atom("clientid_" ++ binary_to_list(ClientId)).
|
||||
handler_id(?TOPIC_TRACE(Topic)) ->
|
||||
list_to_atom(?TOPIC_TRACE_ID(str(Topic)));
|
||||
handler_id(?CLIENT_TRACE(ClientId)) ->
|
||||
list_to_atom(?CLIENT_TRACE_ID(str(ClientId))).
|
||||
|
||||
filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
|
||||
case maps:find(MetaKey, Meta) of
|
||||
|
@ -181,13 +146,6 @@ filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
|
|||
_ -> ignore
|
||||
end.
|
||||
|
||||
log_level(emergency) -> emergency;
|
||||
log_level(alert) -> alert;
|
||||
log_level(critical) -> critical;
|
||||
log_level(error) -> error;
|
||||
log_level(warning) -> warning;
|
||||
log_level(notice) -> notice;
|
||||
log_level(info) -> info;
|
||||
log_level(debug) -> debug;
|
||||
log_level(all) -> debug;
|
||||
log_level(_) -> throw(invalid_log_level).
|
||||
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
||||
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
||||
str(Str) when is_list(Str) -> Str.
|
||||
|
|
|
@ -70,6 +70,7 @@ receive_messages(Count, Msgs) ->
|
|||
basic_test(_Config) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
ct:print("Basic test starting"),
|
||||
init_caps(),
|
||||
{ok, C} = emqx_client:start_link(),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
{ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1),
|
||||
|
@ -81,6 +82,7 @@ basic_test(_Config) ->
|
|||
ok = emqx_client:disconnect(C).
|
||||
|
||||
will_message_test(_Config) ->
|
||||
init_caps(),
|
||||
{ok, C1} = emqx_client:start_link([{clean_start, true},
|
||||
{will_topic, nth(3, ?TOPICS)},
|
||||
{will_payload, <<"client disconnected">>},
|
||||
|
@ -99,10 +101,10 @@ will_message_test(_Config) ->
|
|||
ct:print("Will message test succeeded").
|
||||
|
||||
offline_message_queueing_test(_) ->
|
||||
init_caps(),
|
||||
{ok, C1} = emqx_client:start_link([{clean_start, false},
|
||||
{client_id, <<"c1">>}]),
|
||||
{ok, _} = emqx_client:connect(C1),
|
||||
|
||||
{ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
|
||||
ok = emqx_client:disconnect(C1),
|
||||
{ok, C2} = emqx_client:start_link([{clean_start, true},
|
||||
|
@ -123,6 +125,7 @@ offline_message_queueing_test(_) ->
|
|||
?assertEqual(3, length(receive_messages(3))).
|
||||
|
||||
overlapping_subscriptions_test(_) ->
|
||||
init_caps(),
|
||||
{ok, C} = emqx_client:start_link([]),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
|
||||
|
@ -163,6 +166,7 @@ overlapping_subscriptions_test(_) ->
|
|||
|
||||
redelivery_on_reconnect_test(_) ->
|
||||
ct:print("Redelivery on reconnect test starting"),
|
||||
init_caps(),
|
||||
{ok, C1} = emqx_client:start_link([{clean_start, false},
|
||||
{client_id, <<"c">>}]),
|
||||
{ok, _} = emqx_client:connect(C1),
|
||||
|
@ -194,6 +198,7 @@ redelivery_on_reconnect_test(_) ->
|
|||
|
||||
dollar_topics_test(_) ->
|
||||
ct:print("$ topics test starting"),
|
||||
init_caps(),
|
||||
{ok, C} = emqx_client:start_link([{clean_start, true},
|
||||
{keepalive, 0}]),
|
||||
{ok, _} = emqx_client:connect(C),
|
||||
|
@ -205,3 +210,13 @@ dollar_topics_test(_) ->
|
|||
?assertEqual(0, length(receive_messages(1))),
|
||||
ok = emqx_client:disconnect(C),
|
||||
ct:print("$ topics test succeeded").
|
||||
|
||||
init_caps() ->
|
||||
Caps = #{max_qos_allowed => 2,
|
||||
max_topic_levels => 0,
|
||||
mqtt_shared_subscription => true,
|
||||
mqtt_wildcard_subscription => true,
|
||||
max_topic_alias => 0,
|
||||
mqtt_retain_available => true},
|
||||
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
|
||||
timer:sleep(100).
|
|
@ -27,38 +27,19 @@ all() -> [t_get_set_caps, t_check_pub, t_check_sub].
|
|||
|
||||
t_get_set_caps(_) ->
|
||||
{ok, _} = emqx_zone:start_link(),
|
||||
Caps = #{
|
||||
max_packet_size => ?MAX_PACKET_SIZE,
|
||||
max_clientid_len => ?MAX_CLIENTID_LEN,
|
||||
max_topic_alias => 0,
|
||||
max_topic_levels => 0,
|
||||
max_qos_allowed => ?QOS_2,
|
||||
mqtt_retain_available => true,
|
||||
mqtt_shared_subscription => true,
|
||||
mqtt_wildcard_subscription => true
|
||||
},
|
||||
Caps2 = Caps#{max_packet_size => 1048576},
|
||||
case emqx_mqtt_caps:get_caps(zone) of
|
||||
Caps -> ok;
|
||||
Caps2 -> ok
|
||||
end,
|
||||
PubCaps = #{
|
||||
max_qos_allowed => ?QOS_2,
|
||||
mqtt_retain_available => true,
|
||||
max_topic_alias => 0
|
||||
},
|
||||
PubCaps = emqx_mqtt_caps:get_caps(zone, publish),
|
||||
|
||||
PubCaps = emqx_mqtt_caps:get_caps(external, publish),
|
||||
NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1},
|
||||
emqx_zone:set_env(zone, '$mqtt_pub_caps', NewPubCaps),
|
||||
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(NewPubCaps)],
|
||||
timer:sleep(100),
|
||||
NewPubCaps = emqx_mqtt_caps:get_caps(zone, publish),
|
||||
SubCaps = #{
|
||||
max_topic_levels => 0,
|
||||
max_qos_allowed => ?QOS_2,
|
||||
mqtt_shared_subscription => true,
|
||||
mqtt_wildcard_subscription => true
|
||||
},
|
||||
SubCaps = emqx_mqtt_caps:get_caps(zone, subscribe),
|
||||
NewPubCaps = emqx_mqtt_caps:get_caps(external, publish),
|
||||
|
||||
SubCaps = emqx_mqtt_caps:get_caps(external, subscribe),
|
||||
NewSubCaps = SubCaps#{max_topic_levels => 2},
|
||||
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(NewSubCaps)],
|
||||
timer:sleep(100),
|
||||
NewSubCaps = emqx_mqtt_caps:get_caps(external, subscribe),
|
||||
|
||||
emqx_zone:stop().
|
||||
|
||||
t_check_pub(_) ->
|
||||
|
@ -68,35 +49,34 @@ t_check_pub(_) ->
|
|||
mqtt_retain_available => false,
|
||||
max_topic_alias => 4
|
||||
},
|
||||
emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
|
||||
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(PubCaps)],
|
||||
timer:sleep(100),
|
||||
ct:log("~p", [emqx_mqtt_caps:get_caps(zone, publish)]),
|
||||
ct:log("~p", [emqx_mqtt_caps:get_caps(external, publish)]),
|
||||
BadPubProps1 = #{
|
||||
qos => ?QOS_2,
|
||||
retain => false
|
||||
},
|
||||
{error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps1),
|
||||
{error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(external, BadPubProps1),
|
||||
BadPubProps2 = #{
|
||||
qos => ?QOS_1,
|
||||
retain => true
|
||||
},
|
||||
{error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps2),
|
||||
{error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(external, BadPubProps2),
|
||||
BadPubProps3 = #{
|
||||
qos => ?QOS_1,
|
||||
retain => false,
|
||||
topic_alias => 5
|
||||
},
|
||||
{error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(zone, BadPubProps3),
|
||||
{error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(external, BadPubProps3),
|
||||
PubProps = #{
|
||||
qos => ?QOS_1,
|
||||
retain => false
|
||||
},
|
||||
ok = emqx_mqtt_caps:check_pub(zone, PubProps),
|
||||
ok = emqx_mqtt_caps:check_pub(external, PubProps),
|
||||
emqx_zone:stop().
|
||||
|
||||
t_check_sub(_) ->
|
||||
{ok, _} = emqx_zone:start_link(),
|
||||
|
||||
Opts = #{qos => ?QOS_2, share => true, rc => 0},
|
||||
Caps = #{
|
||||
max_topic_levels => 0,
|
||||
|
@ -104,6 +84,8 @@ t_check_sub(_) ->
|
|||
mqtt_shared_subscription => true,
|
||||
mqtt_wildcard_subscription => true
|
||||
},
|
||||
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
|
||||
timer:sleep(100),
|
||||
|
||||
ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]),
|
||||
ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]),
|
||||
|
@ -122,10 +104,10 @@ t_check_sub(_) ->
|
|||
|
||||
|
||||
do_check_sub(TopicFilters, Topics) ->
|
||||
{ok, Topics} = emqx_mqtt_caps:check_sub(zone, TopicFilters),
|
||||
{ok, Topics} = emqx_mqtt_caps:check_sub(external, TopicFilters),
|
||||
ok.
|
||||
do_check_sub(Caps, TopicFilters, Topics) ->
|
||||
emqx_zone:set_env(zone, '$mqtt_sub_caps', Caps),
|
||||
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
|
||||
timer:sleep(100),
|
||||
{_, Topics} = emqx_mqtt_caps:check_sub(zone, TopicFilters),
|
||||
{_, Topics} = emqx_mqtt_caps:check_sub(external, TopicFilters),
|
||||
ok.
|
||||
|
|
|
@ -56,7 +56,7 @@ groups() ->
|
|||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
|
||||
MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()),
|
||||
emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}),
|
||||
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(MqttCaps#{max_topic_alias => 20})],
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
all() -> [ignore_loop, t_session_all].
|
||||
all() -> [ignore_loop, t_session_all, t_message_expiry_interval_1, t_message_expiry_interval_2].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
|
@ -66,3 +66,63 @@ t_session_all(_) ->
|
|||
timer:sleep(200),
|
||||
[] = emqx:subscriptions(SPid),
|
||||
emqx_mock_client:close_session(ConnPid).
|
||||
|
||||
t_message_expiry_interval_1(_) ->
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]].
|
||||
|
||||
t_message_expiry_interval_2(_) ->
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]].
|
||||
|
||||
message_expiry_interval_init() ->
|
||||
{ok, ClientA} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, ClientB} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqx_client:connect(ClientA),
|
||||
{ok, _} = emqx_client:connect(ClientB),
|
||||
%% subscribe and disconnect client-b
|
||||
emqx_client:subscribe(ClientB, <<"t/a">>, 1),
|
||||
emqx_client:stop(ClientB),
|
||||
ClientA.
|
||||
|
||||
message_expiry_interval_exipred(ClientA, QoS) ->
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a and waiting for the message expired
|
||||
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
ct:sleep(2000),
|
||||
|
||||
%% resume the session for client-b
|
||||
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqx_client:connect(ClientB1),
|
||||
|
||||
%% verify client-b could not receive the publish message
|
||||
receive
|
||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
|
||||
ct:fail(should_have_expired)
|
||||
after 300 ->
|
||||
ok
|
||||
end,
|
||||
emqx_client:stop(ClientB1).
|
||||
|
||||
message_expiry_interval_not_exipred(ClientA, QoS) ->
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a
|
||||
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
|
||||
%% wait for 1s and then resume the session for client-b, the message should not expires
|
||||
%% as Message-Expiry-Interval = 20s
|
||||
ct:sleep(1000),
|
||||
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqx_client:connect(ClientB1),
|
||||
|
||||
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
|
||||
receive
|
||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>,
|
||||
properties := #{'Message-Expiry-Interval' := MsgExpItvl}}}
|
||||
when MsgExpItvl < 20 -> ok;
|
||||
{publish, _} = Msg ->
|
||||
ct:fail({incorrect_publish, Msg})
|
||||
after 300 ->
|
||||
ct:fail(no_publish_received)
|
||||
end,
|
||||
emqx_client:stop(ClientB1).
|
||||
|
|
|
@ -43,7 +43,7 @@ start_traces(_Config) ->
|
|||
emqx_logger:set_log_level(debug),
|
||||
ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
|
||||
ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
|
||||
{error, invalid_log_level} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
|
||||
{error, {invalid_log_level, bad_level}} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
|
||||
ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
|
||||
ct:sleep(100),
|
||||
|
||||
|
@ -53,9 +53,9 @@ start_traces(_Config) ->
|
|||
?assert(filelib:is_regular("tmp/topic_trace.log")),
|
||||
|
||||
%% Get current traces
|
||||
?assertEqual([{{client_id,<<"client">>},{debug,"tmp/client.log"}},
|
||||
{{client_id,<<"client2">>},{all,"tmp/client2.log"}},
|
||||
{{topic,<<"a/#">>},{all,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
|
||||
?assertEqual([{{client_id,"client"},{debug,"tmp/client.log"}},
|
||||
{{client_id,"client2"},{debug,"tmp/client2.log"}},
|
||||
{{topic,"a/#"},{debug,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
|
||||
|
||||
%% set the overall log level to debug
|
||||
emqx_logger:set_log_level(debug),
|
||||
|
|
Loading…
Reference in New Issue