Compare commits
165 Commits
Author | SHA1 | Date |
---|---|---|
![]() |
0e97dfff29 | |
![]() |
d409e25e76 | |
![]() |
c1f8d7de2f | |
![]() |
76e6ded825 | |
![]() |
d730b24494 | |
![]() |
978153d993 | |
![]() |
b47a4f4422 | |
![]() |
7124a40f3d | |
![]() |
614b836983 | |
![]() |
85d81a84d0 | |
![]() |
243e6b3571 | |
![]() |
ad5b954f1d | |
![]() |
21293140d3 | |
![]() |
1150e06d18 | |
![]() |
b8cf9e14de | |
![]() |
dbea5df7dc | |
![]() |
ac93725f74 | |
![]() |
1574d50387 | |
![]() |
0a226ca4bc | |
![]() |
9c2584607a | |
![]() |
2d99a1412e | |
![]() |
6f4b8d637b | |
![]() |
0943129287 | |
![]() |
0908974017 | |
![]() |
19a8f0cbf8 | |
![]() |
b4bbfad415 | |
![]() |
68f6a43492 | |
![]() |
ac1e10bb60 | |
![]() |
31671f5ee5 | |
![]() |
ffef64a803 | |
![]() |
58ba22dfc7 | |
![]() |
0e82170ed5 | |
![]() |
2c73ab9713 | |
![]() |
997b693400 | |
![]() |
1a5d8ca3fd | |
![]() |
be2ce93a2c | |
![]() |
90ace6a331 | |
![]() |
4aff37772f | |
![]() |
0daa703193 | |
![]() |
155eb82283 | |
![]() |
3410151c24 | |
![]() |
43cf0fbab4 | |
![]() |
fa7292560a | |
![]() |
bffca305c1 | |
![]() |
a6210f7142 | |
![]() |
2fc3e560d2 | |
![]() |
6ec7cb5090 | |
![]() |
4d14d51dcb | |
![]() |
def758152d | |
![]() |
1c17d514aa | |
![]() |
b8ce42fd29 | |
![]() |
47d628f9d9 | |
![]() |
c9498d7986 | |
![]() |
19fdf3b77a | |
![]() |
1a2592b13c | |
![]() |
86feae6adc | |
![]() |
5835c06745 | |
![]() |
7c3e5d765c | |
![]() |
872c0af3fd | |
![]() |
411f8a0ec7 | |
![]() |
e76451000c | |
![]() |
65ae10a651 | |
![]() |
b9715b9e71 | |
![]() |
0625bfe3f8 | |
![]() |
6b43acc1c1 | |
![]() |
854a48d77c | |
![]() |
ea6d9f12d9 | |
![]() |
5025b2f65d | |
![]() |
97f9e7123d | |
![]() |
f0a434739d | |
![]() |
076776d7d4 | |
![]() |
44d901eb1e | |
![]() |
297a385def | |
![]() |
bd664ae370 | |
![]() |
21c18d15d4 | |
![]() |
af6ad8a90f | |
![]() |
a2a8e42880 | |
![]() |
0f1b678126 | |
![]() |
1fbc50530a | |
![]() |
0c104faef7 | |
![]() |
8662f55c4c | |
![]() |
cc43da0fd5 | |
![]() |
e6ccbc601c | |
![]() |
22084025e6 | |
![]() |
2b1a2f5e13 | |
![]() |
2164c9149b | |
![]() |
21e31ab1c8 | |
![]() |
1aa30cba89 | |
![]() |
79e37cba0d | |
![]() |
127427b783 | |
![]() |
ff0fd66725 | |
![]() |
5a4adefb16 | |
![]() |
24b4d83c12 | |
![]() |
3d45da8e03 | |
![]() |
962fb0cec5 | |
![]() |
dac1b92d8f | |
![]() |
5fb4f23504 | |
![]() |
5f53952b45 | |
![]() |
e349136cb2 | |
![]() |
28bd9a7fda | |
![]() |
c4bf5aa34c | |
![]() |
f837ac47e7 | |
![]() |
b20e87f98e | |
![]() |
ee9f278738 | |
![]() |
8751c10ea5 | |
![]() |
891ef2680e | |
![]() |
b461e26f25 | |
![]() |
d5b17c516e | |
![]() |
bb9c41c9f0 | |
![]() |
88dbbc3a44 | |
![]() |
a6f138b55c | |
![]() |
55ec358cd6 | |
![]() |
067d28dcb6 | |
![]() |
118e67a8ca | |
![]() |
ff9fccdb07 | |
![]() |
87f12256ca | |
![]() |
9c1421f4b8 | |
![]() |
e58b0fc1db | |
![]() |
cd8dacabd6 | |
![]() |
dc76d7ef93 | |
![]() |
be76a6e50a | |
![]() |
b55e76001a | |
![]() |
0adc8b39af | |
![]() |
d191ef0303 | |
![]() |
4aa50f0f6e | |
![]() |
9433e563fb | |
![]() |
aa6ae3ad8a | |
![]() |
8d6a78ce8e | |
![]() |
cad0f1a858 | |
![]() |
b7abf7ca7a | |
![]() |
3c6930b849 | |
![]() |
ed1f428ef6 | |
![]() |
d7e46b367e | |
![]() |
de319fa29a | |
![]() |
d998728847 | |
![]() |
f9dde2f049 | |
![]() |
609968dd31 | |
![]() |
0bcc692071 | |
![]() |
1e5241f401 | |
![]() |
83cfcc5d2f | |
![]() |
010e1fa9a8 | |
![]() |
b9378710d8 | |
![]() |
69665be6f8 | |
![]() |
849f0aaef7 | |
![]() |
1da9e4397e | |
![]() |
f5b77f2e7b | |
![]() |
e7e8131f2a | |
![]() |
f717b734c3 | |
![]() |
5b0c752181 | |
![]() |
92f3036f1f | |
![]() |
58e3555e9b | |
![]() |
e79c714ec4 | |
![]() |
2ee18ddebc | |
![]() |
490ac8f449 | |
![]() |
27fcb73483 | |
![]() |
5039b751f4 | |
![]() |
f13654dbce | |
![]() |
18100cacf9 | |
![]() |
ad5ece8c33 | |
![]() |
1f842e4a19 | |
![]() |
8822468528 | |
![]() |
3c0294eafe | |
![]() |
e722e0f6d1 | |
![]() |
ebbba23276 | |
![]() |
61567c0fcc |
|
@ -28,7 +28,7 @@ git clone https://github.com/emqx/emqx-rel.git
|
||||||
|
|
||||||
cd emqx-rel && make
|
cd emqx-rel && make
|
||||||
|
|
||||||
cd _rel/emqx && ./bin/emqx console
|
cd _build/emqx/rel/emqx && ./bin/emqx console
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -1,17 +1,18 @@
|
||||||
-----BEGIN CERTIFICATE-----
|
-----BEGIN CERTIFICATE-----
|
||||||
MIICxjCCAa6gAwIBAgIJAJk1DbZBu8FDMA0GCSqGSIb3DQEBCwUAMBMxETAPBgNV
|
MIIC0TCCAbmgAwIBAgIUDQN8HojZmyEV9+AzEz6j6juwThswDQYJKoZIhvcNAQEL
|
||||||
BAMMCE15VGVzdENBMB4XDTE3MTEwMjEzNDI0N1oXDTE5MTEwMjEzNDI0N1owEzER
|
BQAwEzERMA8GA1UEAwwITXlUZXN0Q0EwHhcNMTkxMTE1MDcyNjU4WhcNMjkxMTEy
|
||||||
MA8GA1UEAwwITXlUZXN0Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB
|
MDcyNjU4WjATMREwDwYDVQQDDAhNeVRlc3RDQTCCASIwDQYJKoZIhvcNAQEBBQAD
|
||||||
AQDshDho6ef1JClDJ24peSsXdFnFO3xIB7+BSp1YPcOvmRECKUG0mLORw3hNm15m
|
ggEPADCCAQoCggEBALce8QYBpl7fxEhwW0wtBQygXisMcPTKzckz3RhU21TeqK1Z
|
||||||
8eGOn1iLGE/xKlaZ74/xjyq8f7qIGZCmvZj59m+eiJCAmy8SiUJZtSVoOlOzepJd
|
6Fm03QyYvB239oYJLodVwzv5SNI75hZ43Vyp+SHt3M3DjcsU/8PflxFK4QR7TdhI
|
||||||
PoDgcBvDKA4ogZ3iJHMUNI3EdlD6nrKEJF2qe2JUrL0gv65uo2/N7XVNvE87Dk3J
|
ddn6R59Gqt0MhAZ/df2dYt7cMaQV8/5plzxLvrv9X2fwo8BYAGp6g6wGAL8SJDT9
|
||||||
83KyCAmeu+x+moS1ILnjs2DuPEGSxZqzf7IQMbXuNWJYAOZg9t4Fg0YjTiAaWw3G
|
jd9TGzBG/o3dLu3keEwcl0CMq3qUwxatBHMe2s7COKBrngD/CvRAL8tG3VTj7ep9
|
||||||
JKAoMY4tI3JCqlvwGR4lH7kfk3WsD4ofGlFhxU4nEG0xgnJl8BcoJWD1A2RjGe1f
|
n29SSS8qMzHhJdBahTDrYS+SeW61iFK1yLXSxCWNoMB0/g7/AktWuAXHdHRX9xaf
|
||||||
qCijqPSe93l2wt8OpbyHzwc7AgMBAAGjHTAbMAwGA1UdEwQFMAMBAf8wCwYDVR0P
|
WNJ4RdoPxhqkVJ8SrC4JtC8ah6DchVysWnz2KwMCAwEAAaMdMBswDAYDVR0TBAUw
|
||||||
BAQDAgEGMA0GCSqGSIb3DQEBCwUAA4IBAQAi+t5jBrMxFzoF76kyRd3riNDlWp0w
|
AwEB/zALBgNVHQ8EBAMCAQYwDQYJKoZIhvcNAQELBQADggEBAEgnPnHLdivykReJ
|
||||||
NCewkohBkwBHsQfHzSnc6c504jdyzkEiD42UcI8asPsJcsYrQ+Uo6OBn049u49Wn
|
I8xf5DeWsgBUdVvhxz2E9Ole/u6ThulNLziwHernkTprskiKFJaF67ZzS7YddTdf
|
||||||
zcSERVSVec1/TAPS/egFTU9QMWtPSAm8AEaQ6YYAuiwOLCcC+Cm/a3e3dWSRWt8o
|
WsS0H5LhYaft5NnBcn9UHCKEycyr3AJZ6joB3Dd9CfMQEscnZHNmIXwPGxw4bYP6
|
||||||
LqKX6CWTlmKWe182MhFPpZYxZQLGapti4R4mb5QusUbc6tXbkcX82GjDPTOuAw7b
|
AElF0Iy7LY/Z8po/UACTBzCCSf5UkZ9Jy/rzxuvn/cfPcLNhDWk8b8MbmOfuyNPV
|
||||||
mWpzVd5xnlp7Vz+50u+YaAYUmCobg0hR/AuTrA4GDMlgzTnuZQhF6o8iVkypXOtS
|
SfPGn7wXIt9iyyA4qyzEVMaXl8d94E48dV5Fc1sQEEo6gk16dQ9p64ePMvUih6an
|
||||||
Ufz6X3tVVErVVc7UUfzSnupHj1M2h4rzlQ3oqHoAEnXcJmV4f/Pf/6FW
|
kSz9X/n1+9sHq54pJmLZ2gfRvGPIPVIipSjAj4sjHvKzuC3CQTTXs9HzmN2nT0zx
|
||||||
|
gLxgEkY=
|
||||||
-----END CERTIFICATE-----
|
-----END CERTIFICATE-----
|
||||||
|
|
|
@ -1,18 +1,18 @@
|
||||||
-----BEGIN CERTIFICATE-----
|
-----BEGIN CERTIFICATE-----
|
||||||
MIIC6jCCAdKgAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
|
MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
|
||||||
c3RDQTAeFw0xNzExMDIxMzQyNDhaFw0xOTExMDIxMzQyNDhaMC0xGjAYBgNVBAMT
|
c3RDQTAeFw0xOTExMTUwNzI2NThaFw0yMTExMTQwNzI2NThaMCoxFzAVBgNVBAMM
|
||||||
EU1hY0Jvb2stQWlyLmxvY2FsMQ8wDQYDVQQKEwZzZXJ2ZXIwggEiMA0GCSqGSIb3
|
DjAwMDQubm92YWxvY2FsMQ8wDQYDVQQKDAZzZXJ2ZXIwggEiMA0GCSqGSIb3DQEB
|
||||||
DQEBAQUAA4IBDwAwggEKAoIBAQDUO/kL3ar3WsopPF12qAf+cwDHklGJIxJsjdoZ
|
AQUAA4IBDwAwggEKAoIBAQDC5JE48PJ/BFTLEseEbrGIdYB6w29hme4KFKmAqlLQ
|
||||||
XgI1lPEe1W1QXwb/G/tyf6Fj2J8CD5bfsRjDxAemFIBVrFwlunCk+Gs6xR7vzz4O
|
kpwwZJAsm/9iuXy6svJf7Tzzc173Jkgzw7DzhzSf1VgRDrOCQS+IU6s8UXfUMJt/
|
||||||
Fonoj4pmleruLQrNY/bHa2WN97OdISyXzhOgDwSaqobnF0n/f0Mx+9sdHO3p8LNB
|
AmP1SkU2mUJ/+pnEGRKtVkF9LCScinI95Iwt3xngdjMYXwk+S9Le3/8782ClBwZG
|
||||||
3JXUyBpwDNr/TTfAb4pbQEu3LF4p7uyd1eLhKzUxSiWzKtjB1EYObA87fZu0tBJZ
|
vffXQ7hd5HnShgyqFVePgrKmr879NTylfvAWPwux2kdXNnbOHIrhcZm0NeMNf7hs
|
||||||
iGujuFiI7tf4qWKeuAoRa/cXkgVZhk0utYauDoa7qBZ5O6ZdEko9ov0+i5+1JGU/
|
UNURFlqo4rA0FV9dIHMryPkM7ygoaMog2XmcCnq/jf/MfPTQPYjQ9iLPOGrYi0pY
|
||||||
w5wrSPNAnM2lYVUn0kJmcV2gwa4RZFjdqp+/Fx+HnKbnhZEnAgMBAAGjLzAtMAkG
|
X12uFb55duRGsvs7MIkNc8fn2VERoC69QX+GK+zAUGZ/AgMBAAGjLzAtMAkGA1Ud
|
||||||
A1UdEwQCMAAwCwYDVR0PBAQDAgUgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqG
|
EwQCMAAwCwYDVR0PBAQDAgUgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA0GCSqGSIb3
|
||||||
SIb3DQEBCwUAA4IBAQByWhNxX/L5QYBiMY4JM1RRciV4uI3F2vsc0yMFDSrZza+5
|
DQEBCwUAA4IBAQBpW7Ge5duo6/u3xIl0XhG/2dlSwlUUpO3Ecc13gmh44nJR66VH
|
||||||
tNJQS86hjQsCRZh9VshezvT7k1yVsAC4pnu2pzob8H3KG4vYBafMdl2Ghgv3RMix
|
BEiimsol6gIgcSTk4pVY1DLb/09Nwv0TILl3Dc4QtXhM4gIlNRR79mLVsnPTef5e
|
||||||
J3NrBhcoYYhXEoZHost+htxEi7P3QBo/qDkk48/d30+aDPbms6kQd8Fj8+C5tD3b
|
xkmesQaLihSCroHq8bONnO/Xgj5hCg8uI4j3vHtOikjABxQPOrCfc2uSrenU7aol
|
||||||
aznO5Qlni72uTaM7fNA8exoc/YZc83lsqv7v+UzNQR595jnYSIAZcgil1qqygOan
|
1HBijCY6R+pg6WxBOZ2Teiaoxjn78IxSKLXW0pLRJIPpet1hefR0sKkmPfVGyg8H
|
||||||
Zx/RsMGUz6EYI9lPpoyyVtw13SoQshfgwvUlvBMiekSuI/pp6N7QPK6C8DLO0tVv
|
g7hqo+Houw8PQf2HLZnU656vyTlgIh6ES1x7Plb0cIw/LGr4rMkXs+DFg9SLbetT
|
||||||
gXJjDgioqHc3hcgG4cskLbfVnohiwdhQTFayrLEk
|
ncT4plfucsek7ImN9Dw2w2hM2FZwB8ycZfmu
|
||||||
-----END CERTIFICATE-----
|
-----END CERTIFICATE-----
|
||||||
|
|
|
@ -1,18 +1,18 @@
|
||||||
-----BEGIN CERTIFICATE-----
|
-----BEGIN CERTIFICATE-----
|
||||||
MIIC6jCCAdKgAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
|
MIIC5zCCAc+gAwIBAgIBAjANBgkqhkiG9w0BAQsFADATMREwDwYDVQQDDAhNeVRl
|
||||||
c3RDQTAeFw0xNzExMDIxMzQyNDhaFw0xOTExMDIxMzQyNDhaMC0xGjAYBgNVBAMT
|
c3RDQTAeFw0xOTExMTUwNzI2NThaFw0yMTExMTQwNzI2NThaMCoxFzAVBgNVBAMM
|
||||||
EU1hY0Jvb2stQWlyLmxvY2FsMQ8wDQYDVQQKEwZjbGllbnQwggEiMA0GCSqGSIb3
|
DjAwMDQubm92YWxvY2FsMQ8wDQYDVQQKDAZjbGllbnQwggEiMA0GCSqGSIb3DQEB
|
||||||
DQEBAQUAA4IBDwAwggEKAoIBAQC8GptpL25hv1Qa3jCn4VLvDRH/SrHg9wXvqRkz
|
AQUAA4IBDwAwggEKAoIBAQDcwo5SaoRpzkqy+Y9OADOL7U84h1VFfjb5Uu5raenO
|
||||||
HuiKMxYT30m4+kcaXv350CJrkV+8lR24wdN7DBVewpCUnyUBbzkLccy1LUzunZ3z
|
elmHSaCZpVP2EsDUaWavtabHd9fa5Oq6lOyZPDZM6xttfi78EV4RRfEJ4XdvE54W
|
||||||
nm37j6cautD3rlC9gsC9d0uJ745FLx5t/6f1jMk9rWxn+4iSGAnkWC3mVaQxP1zQ
|
MZSDAGz4RwxfGOQWBSFyp1NrzT32eqeDSyBrE3jhWx9UUUMwthg5YYjCdBwK+Dwf
|
||||||
q8GI97uob9HNb0OH6ygHJAcKOWB+85a29LIMa1uo/lT3hMr8sBg2vX+1F/gTusmW
|
hsfS1YeAfXPNO/BGSTe0dPhjLztXe1BkFO5VAwkSXaPs2lBJddOgpTTLXQ3+hIPL
|
||||||
xVoQc9XJxBCs995qsH0UkZIuOY0XZp9/qFfcZv2QmslG8DojIIHKcujzu8bItE2M
|
ozkiaTOMOvIMXsCspdhJbSc+jAAGZT5X9Tx7htYbPXIwyDJgeYGmLtr9XxPJ8XGR
|
||||||
OyL5NlWLvN6qg59hHzF4+D+T+8GkhhKWSC+xdY14eQ5fB4S5AgMBAAGjLzAtMAkG
|
rpxkB3zASRcwQzsxTcwkG4E32T53tKsljTkNt15rIoo3AgMBAAGjLzAtMAkGA1Ud
|
||||||
A1UdEwQCMAAwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMCMA0GCSqG
|
EwQCMAAwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMCMA0GCSqGSIb3
|
||||||
SIb3DQEBCwUAA4IBAQBLV4ZfhiKiFVnL/xO0MRGSKr3xd0LK64SW8Iw5DYkc0jNX
|
DQEBCwUAA4IBAQBRtQMvUmiB84RmrGwHCP8hcGUWTz03mtTjGrykNA7YQkA09cRl
|
||||||
sDrRbj2I/KJ/Rc4AeKT751L+C+KBzYpFgiLrxDmt/5pmgiFH51hPQtL7kRC0z2NY
|
RwiqYMWh6zHjdX1Ri3m9eIi/QSK/JX3S9zjZU9dSTtsdnMhkRL08kcxauv9gVXCG
|
||||||
EY/P+u4IFVSo+b1hHYU7y+OMj6/Vvd4x0ETS4rHWI4mPDfGfvClEVLOktgRKrMU5
|
G1Vf+lUVJxTqwuAmcLiDNg9/89sSlxQXFS7Jn9TwTvNiRoFoN5IiJ4LsXyr4uS9Y
|
||||||
9aTltF4U0FBUlYZTQBNBUFwBzj1+0lxK4EdhRmmWJ+uW9rgkQxpnUdbCPGvUKFRp
|
S4Ul1aqetwpTV8bjpIbRJbOR8qBFshIZOPdgAT3RqbD/vpGzOvvV0c9g3VFLYoK3
|
||||||
3AbdHBAU9H2zVd2VZoJu6r7LMp6agxu0rYLgmamRAt+8rnDXvy7H1ZNdjT6fTbUO
|
nQ63w1zhwYxC4MQD9rN7JRAKCDQBLNzf8PW0RSG9pVsf1IjaLxtsmQMgrAati/Ux
|
||||||
omVBMyJAc1+10gjpHw/EUD58t5/I5tZrnrANPgIs
|
AG76LAn9sodtb4GtV8E9ITG0pMNlJyUovstS
|
||||||
-----END CERTIFICATE-----
|
-----END CERTIFICATE-----
|
||||||
|
|
|
@ -1,27 +1,27 @@
|
||||||
-----BEGIN RSA PRIVATE KEY-----
|
-----BEGIN RSA PRIVATE KEY-----
|
||||||
MIIEowIBAAKCAQEAvBqbaS9uYb9UGt4wp+FS7w0R/0qx4PcF76kZMx7oijMWE99J
|
MIIEowIBAAKCAQEA3MKOUmqEac5KsvmPTgAzi+1POIdVRX42+VLua2npznpZh0mg
|
||||||
uPpHGl79+dAia5FfvJUduMHTewwVXsKQlJ8lAW85C3HMtS1M7p2d855t+4+nGrrQ
|
maVT9hLA1Glmr7Wmx3fX2uTqupTsmTw2TOsbbX4u/BFeEUXxCeF3bxOeFjGUgwBs
|
||||||
965QvYLAvXdLie+ORS8ebf+n9YzJPa1sZ/uIkhgJ5Fgt5lWkMT9c0KvBiPe7qG/R
|
+EcMXxjkFgUhcqdTa8099nqng0sgaxN44VsfVFFDMLYYOWGIwnQcCvg8H4bH0tWH
|
||||||
zW9Dh+soByQHCjlgfvOWtvSyDGtbqP5U94TK/LAYNr1/tRf4E7rJlsVaEHPVycQQ
|
gH1zzTvwRkk3tHT4Yy87V3tQZBTuVQMJEl2j7NpQSXXToKU0y10N/oSDy6M5Imkz
|
||||||
rPfearB9FJGSLjmNF2aff6hX3Gb9kJrJRvA6IyCBynLo87vGyLRNjDsi+TZVi7ze
|
jDryDF7ArKXYSW0nPowABmU+V/U8e4bWGz1yMMgyYHmBpi7a/V8TyfFxka6cZAd8
|
||||||
qoOfYR8xePg/k/vBpIYSlkgvsXWNeHkOXweEuQIDAQABAoIBAHnFV7peRDzvGUlT
|
wEkXMEM7MU3MJBuBN9k+d7SrJY05DbdeayKKNwIDAQABAoIBAC6ww3Mw7iKGrAvg
|
||||||
cXgcvA2ZDn+QIVsbTzJ466FWbv+YVsCCmj0veHwv5oakIMQ2Fh4FAnqqr3dGuUbg
|
dmuz5TMSFPBKx0E0aaIf5Sc4tmeiPu87Jkl4yyI/YyNJy5scG1MSyMeWJQMjXksm
|
||||||
+avc4p3tHKa2Aul+7ADE9I3TkCt8MZdyPPk6VXZ5gMCmy7X96MIM4Mwg5uBlRZmx
|
jgGEtD9bMcrETZXvqgRB+IW4q3XcNKHkZCe6tyYh2JPDsAhU1XL2bMWFuYouSIP9
|
||||||
/S3Lffvlp/G0y/ICmwpulG1Z4y4A5Vc0Qf7fBO03Ekl31oReARnB6ex7RnDHH1mW
|
EVLwd9bYfRJ/YO4577fY4Nl9GRI9hdOB0Y4dDvxHCprxXC/wH6NpvI5dktTPr2xl
|
||||||
RyLWNqyu9BhUbFpIyFPWDSkBcajNIbQ6qVJfGLm5Y2xVhwdqbyvY8M06uuMKz/IR
|
pNqABKdG8XEzP0duIpQf5zXbfDAWRUEpB9MDBXqmKmdjdPnpNS7JtkmCtWogdA9F
|
||||||
SYfdIpiC4PpQZQzzXMn/6LTKWcCe0T+dBcWTZHC3C2abrC7+5fwFobs2xoUaCwz0
|
LcyFI3e86qB9HHaqq1hBsQEG/DYj0RxCcAQFqTfvpxmZOXDlfWdB7M8xnqkD5xT0
|
||||||
1CclogECgYEA6Jdv+2VSYIBLbS0VIe07JiZaQNd1QNg63MK/y7oqAKEzYvpWzJel
|
s6K1TXECgYEA79Lx5FFxfkN/uZKQzV1slJ/GSyfJqKhRh38/8G1ncmSG5dh0QMht
|
||||||
owPdBU3GxZH6vUUF7sCABcjumEDazoqTtzHQBo0xYpJrjmAL0ANNGVvF09pJK2eq
|
Tt7FbFhYwGZQY9iMq1g/ujlHAzdKbFHGRX0z30xP7kf0R/L0W2yHMq59Ys4nUhGY
|
||||||
yotxJJAS5/lQNSgWOxGVc6qu6ZpgeIXVLIx816yq04h10yVgZ2Lm3+ECgYEAzwj5
|
o1v2sGxgDDP9XPNm/MV8DCZcoLMxvvFrfWLMYcvWTJb8TBGQgqpcEF8CgYEA66Zu
|
||||||
/UVpN/ak6PwZ+Tq/8qOYjY2ABylRmP+T0Rkqmwh2B5Sp9oXjkDQwWseY0Wybhd8F
|
d+l2W5LSTgwYeIAQuiIhhNLY9Ct94TWrum5QZMdeR+IUYn+dT817Qbmf4KiiihfJ
|
||||||
kO6BUCMUApnB3uU0baawVbDUSrt43SkkKV9m3pA35wA3pYw1a56QIEFr56npFYBS
|
V8t3tYgBBamNpqMKpm7An+HnFgRoV3o8W0pjlKdaQ0EiwhTQsLJcmZ1JV/k9Dd4V
|
||||||
sn9yl/ZtNvnuwmrHWOq8HdwPJsWREyO61yknn9kCgYB1PdixpSo4AJOErePoHRfi
|
Rl26M0DZRKTHIUWLt7nNYexydQpfWlfRX1/n9SkCgYAKphUzjCI79wdO2CEx3Tob
|
||||||
rBR0eObez+Aj5Xsea3G+rYMkkkHskUhp+omPodvfPS1h+If8CEbAI7+5OX/R+uJo
|
B1UotSWRJZgpKg9Ov6zeOXR79DaFQeEIpX+ipfGa6XAcXtswKIT74dszW1skoCTr
|
||||||
xpAwrT1Gjb3vn5R0vyU+8havKmoVmgTqYg2fO4x8KBz5HoLONZfbHR9cG3gjaHrD
|
pPmOqrbJ38wK/dC31oPSTkkm//xi+oEKj+TORKGnKQ/Q9sXV53bwmyt1vz8wOUwK
|
||||||
IPHRGXVmeXPDAiUtGBp+oQKBgQDDRIAkNPdMZUCczknhG1w3Cb20pKUAHCRt3YAZ
|
jz6AASsMz494WTdPdf0MhQKBgQDGoBos6JPiy/aH4podt5Rhz7MBCdfkt2P7GAoP
|
||||||
U1cv6gcIl1rGvPko5VBGDsM/ouP8m6CwVYN5hdw1p7eG9z8/vFvMNn/EDJWuYkNN
|
sjwBNiq53E3iWD54rXJfC98+teWLEFGdttrIIEL8StYixvqLHn8uRHNLk5t/YIDP
|
||||||
EkH/4J4ZLcdOSLOJ0X+2LH4Nfd/s+58D49i9IxtXItviWruyTZMnxooz01tFZgmv
|
UfxtqEHkvlpVzMW6qhxzPqg7htF3huHX1djEqrx3p4xQ9xW1Xt9G0s4G6R9GPw8z
|
||||||
LY3F4QKBgDirafhlJqFK6sa8WesHpD5+lm3Opzi4Ua8fAGHy2oHN3WCEL74q691C
|
nNsfQQKBgF1nvj5xhD7fiVzS7NrjtBslDxKGQCfs9f1Xl2eadGp6pgwG/hvw5oO7
|
||||||
fA0P2UrzYiF7dXf4fgK9eMMQsdWS4nKyCSqM6xE4EAhAHUTYzY3ApNjI3XFDIrKC
|
gtoYJuPq+Zu92a+UDQVErXMHiXn3iza/3EOf2BP9zbq9mBGZtKmLExP0QGEmDygb
|
||||||
oQefIOLum2UyWFuEoUtrEfc5fxktiQohCwuAvwC59EwhmsNlECA8
|
Yo18YdfwWwqxvEf+jt2URv0w+KNWL/3j5rDmngNa1iNubX3p1AK6
|
||||||
-----END RSA PRIVATE KEY-----
|
-----END RSA PRIVATE KEY-----
|
||||||
|
|
|
@ -1,27 +1,27 @@
|
||||||
-----BEGIN RSA PRIVATE KEY-----
|
-----BEGIN RSA PRIVATE KEY-----
|
||||||
MIIEogIBAAKCAQEA1Dv5C92q91rKKTxddqgH/nMAx5JRiSMSbI3aGV4CNZTxHtVt
|
MIIEpQIBAAKCAQEAwuSROPDyfwRUyxLHhG6xiHWAesNvYZnuChSpgKpS0JKcMGSQ
|
||||||
UF8G/xv7cn+hY9ifAg+W37EYw8QHphSAVaxcJbpwpPhrOsUe788+DhaJ6I+KZpXq
|
LJv/Yrl8urLyX+0883Ne9yZIM8Ow84c0n9VYEQ6zgkEviFOrPFF31DCbfwJj9UpF
|
||||||
7i0KzWP2x2tljfeznSEsl84ToA8EmqqG5xdJ/39DMfvbHRzt6fCzQdyV1MgacAza
|
NplCf/qZxBkSrVZBfSwknIpyPeSMLd8Z4HYzGF8JPkvS3t//O/NgpQcGRr3310O4
|
||||||
/003wG+KW0BLtyxeKe7sndXi4Ss1MUolsyrYwdRGDmwPO32btLQSWYhro7hYiO7X
|
XeR50oYMqhVXj4Kypq/O/TU8pX7wFj8LsdpHVzZ2zhyK4XGZtDXjDX+4bFDVERZa
|
||||||
+KlinrgKEWv3F5IFWYZNLrWGrg6Gu6gWeTumXRJKPaL9PouftSRlP8OcK0jzQJzN
|
qOKwNBVfXSBzK8j5DO8oKGjKINl5nAp6v43/zHz00D2I0PYizzhq2ItKWF9drhW+
|
||||||
pWFVJ9JCZnFdoMGuEWRY3aqfvxcfh5ym54WRJwIDAQABAoIBABNq2UJIqZev6scT
|
eXbkRrL7OzCJDXPH59lREaAuvUF/hivswFBmfwIDAQABAoIBAQCYa9gj11Vf/0wt
|
||||||
CsoMXY7eHrgjnuoZF1pvMAEaJMGaOuVDSZkM2KsGeF7lZnKoIwQhQQB+R3HBwaFk
|
kh9WNJhGJ9d2q5hVleR0H9q9FPg1xSPAOTYEnXBrjrO89CzY1xq/L7DKzDbVvSuM
|
||||||
RsmP125sPFobkFP0LPxrzZWkYkGwwEzacoAQBuj7uFxOayAuBXTe0CGjbRA7z4QH
|
GmcOxfTdSkkcCs0Y6o7WWsTDv8ws1frFIPPmkpBOtPhDRHS1+eq38akkgKZ+P1te
|
||||||
DgiejNqfXhp4nHdxaiL5Lq1b7SlmarGXup3kcVTWxIiah4MK0o4YGiyQC8Mr+a7w
|
mMiNIwQtAE6jWPuvcTIVee9QwaCn+5ZYIwICORNFoLsl7sKdLOfccSO7v9L/Az5r
|
||||||
UGYqdKQQMLOtly/HTEcyd/DAruboV+5L+pYx/pcFFXJupK6yaxELLHKeHAKA9MmA
|
AT4xrJwpKl5MjOGzOxFv6M1rTh/Y9e17U+2/QQDnW4U7C4/gkQ1urJddaeDDnz8t
|
||||||
cnMNVpCQ8VdOyR9qrfwtABqd8egKea2Z3P+dK6PlxUAQe2kYlXxS0N+i/eU6PKYM
|
GLAnshCdF8eL3vAKO6sMJiEGuVe3b2oBYrRjp7FSB1uLWWlFRb7TGE07UXP0JZDn
|
||||||
B76UhQECgYEA9GvBNG6ffQqkX6bNLQUsU+nvKAQeFq02ua9LFKYw2sVO6RfUjrNz
|
W1lmUbcBAoGBAO4/37Obk1pM6GQzS49AwLJtz5Z9DpxMSaVW6XHQlOq6RBNQsMR0
|
||||||
u2cwAUXSp+tnPMesKEVOOUfRMN/QiI/JNw62uSWSKJ/64103vX+F5AjQmE2f7Zgt
|
MS5k5TZgX0HZXAu0dGaPNzD7868dwTZE7tn6a1QanfmrqQVbJxHWTJtPEE0QGpGI
|
||||||
o3X23cV544HM8E5xCvIe7DFLK6cUdRQngu/uWi63cB4hMVpB9MfZJscCgYEA3kne
|
vg2D6iiYUE0mVEiKf7dNp6hpp9ioYIdsRQK0H/u3sU/JfEFr00XpMb+BAoGBANFp
|
||||||
2sE4b67JkjmHGKahBJM5/iAHBqSubQmufIlaiLkyrDYGN2D+mi0fAF+uQ9KmNOrv
|
wMIcB7RbyShO8QR/kVpahlOOnNDP7e+9KdUFl8i300ecO2QNR+hlQ+565J8nsANj
|
||||||
TsZ1bZu9f+VvaH7xNJzcUriXYs+HoN9/CWnAR0ktSm8RN7BznVd41NuLnsoWUt41
|
Y2kLMls2DTzMefrEZPecb8onjGFmSkwf9uCs8vmorlYmYmNlJkLL06ZN3SrpmHBD
|
||||||
jglpNYMwy7JPRLQNgYHErG7puksNawFvSKQEYqECgYA4N/iueKtSdXotTg5vRntV
|
GogkCt1qkrTgtszjSqZe94UcpT+mfatSK4lRlSX/AoGAXdE7Ns/Ji6KDVIm6dFOs
|
||||||
qb8KczgAe0LVHs6kJz2hdDScRJDtabU665cNE+RKH0kVn8+nS5mcbzpchX5PitL7
|
TdbeCsV+DmAgFAKQdKgNLA1jJzP8F7Aleb5zYCE9AYIlM9rAh25X7msYf1m5LrSg
|
||||||
SPUaTNv7YCCy3yQNACHpu2VPQruASLpmmKF5jQxmGdrrgv9ZRyt5pDToC3wXGdWk
|
Vae9weWlVZ6aNSi6ztRTYEkXAzGXNL3jERFkEM5BuM+iGtqnBjiHD9NjK/bJ5Cnn
|
||||||
tk8aixhCP4ve8CWvibAWzQKBgExxxwwf6tKtn3CEDCu0EifKoeT9Cq2EMOAatkDp
|
VvQ1L/sa0G9oBZ7/GCWG2IECgYEAqAOmENb2Y4FEyl9Txl0nXIvGvCFutaYt66wk
|
||||||
05K1bfG/Wn/tAWHwJnswbHOym6oTKV1D7tpU9uRm+NtM3JKlZzejd5xpllECy2Nn
|
dPIQzoyWKh0yFVsGd3FP6HWXGg54jK9gIfZGx6F9S2tu7oBF1dggZNwIKFkugRcg
|
||||||
VNKvHb49WAR40CnKDSnWnrtq8CZreKtyHRZkGYHTvmL4MLTa9dH/Cq4gZWrpQWYP
|
NzDrnNz2Ss5vH/oWkX8BZ6uPKA/VKzTbg6EPSohn/lFQuOAfk44cHyNVfdTxfNPn
|
||||||
0dpBAoGAD4inpSm7SMN3/rgYXEU1CMRKXREbEWhondiTXZ8x8ugnnYtfhcBvCMif
|
dDwNYzcCgYEA3Wig1HRNvTOnSskFz8eTmpu89hg1atuAk+c2d1Z+9HKVjkc6B/91
|
||||||
JQ8tso63hCHvKPDViTbLDyV7OuGBEPTQAyacX0FJmr7g5ERlvfmL4yjmvW7Bcclh
|
pabnbujERtH3HW9TQ9+VVfImgC3Jy+TjsS3d6nA7e9060N9z30mVEY5lq03mKAMl
|
||||||
yrgbJXl2pdzMt9GpogIYFW0YyOr6VPIrGf62kRNrv2E8wyXEFAI=
|
tSKFk4fRRxsKPsBN/NS0BiU8LJTzsDwLwRm9T4BNos+I35a8tFCCmtw=
|
||||||
-----END RSA PRIVATE KEY-----
|
-----END RSA PRIVATE KEY-----
|
||||||
|
|
|
@ -181,6 +181,11 @@ node.name = emqx@127.0.0.1
|
||||||
## Value: String
|
## Value: String
|
||||||
node.cookie = emqxsecretcookie
|
node.cookie = emqxsecretcookie
|
||||||
|
|
||||||
|
## Node Max Clients Size.
|
||||||
|
##
|
||||||
|
## Value: String
|
||||||
|
node.max_clients = 1024000
|
||||||
|
|
||||||
## Data dir for the node
|
## Data dir for the node
|
||||||
##
|
##
|
||||||
## Value: Folder
|
## Value: Folder
|
||||||
|
@ -942,7 +947,7 @@ listener.tcp.external.access.1 = allow all
|
||||||
## Enable the option for X.509 certificate based authentication.
|
## Enable the option for X.509 certificate based authentication.
|
||||||
## EMQX will use the common name of certificate as MQTT username.
|
## EMQX will use the common name of certificate as MQTT username.
|
||||||
##
|
##
|
||||||
## Value: cn | dn
|
## Value: cn | dn | crt
|
||||||
## listener.tcp.external.peer_cert_as_username = cn
|
## listener.tcp.external.peer_cert_as_username = cn
|
||||||
|
|
||||||
## The TCP backlog defines the maximum length that the queue of pending
|
## The TCP backlog defines the maximum length that the queue of pending
|
||||||
|
@ -1293,10 +1298,10 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
|
||||||
## Value: on | off
|
## Value: on | off
|
||||||
## listener.ssl.external.honor_cipher_order = on
|
## listener.ssl.external.honor_cipher_order = on
|
||||||
|
|
||||||
## Use the CN, EN or CRT field from the client certificate as a username.
|
## Use the CN, DN or CRT field from the client certificate as a username.
|
||||||
## Notice that 'verify' should be set as 'verify_peer'.
|
## Notice that 'verify' should be set as 'verify_peer'.
|
||||||
##
|
##
|
||||||
## Value: cn | en | crt
|
## Value: cn | dn | crt
|
||||||
## listener.ssl.external.peer_cert_as_username = cn
|
## listener.ssl.external.peer_cert_as_username = cn
|
||||||
|
|
||||||
## TCP backlog for the SSL connection.
|
## TCP backlog for the SSL connection.
|
||||||
|
|
|
@ -41,5 +41,5 @@
|
||||||
|
|
||||||
-define(LOG(Level, Format, Args),
|
-define(LOG(Level, Format, Args),
|
||||||
begin
|
begin
|
||||||
(logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end}))
|
(logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}))
|
||||||
end).
|
end).
|
||||||
|
|
|
@ -334,6 +334,13 @@ end}.
|
||||||
hidden
|
hidden
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
%% @see node.max_clients
|
||||||
|
{mapping, "node.max_clients", "emqx.max_clients", [
|
||||||
|
{default, 1024000},
|
||||||
|
{datatype, integer},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% RPC
|
%% RPC
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
{gproc, "0.8.0"}, % hex
|
{gproc, "0.8.0"}, % hex
|
||||||
{replayq, "0.1.1"}, %hex
|
{replayq, "0.1.1"}, %hex
|
||||||
{esockd, "5.5.0"}, %hex
|
{esockd, "5.5.0"}, %hex
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.9"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
|
||||||
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
%%-*- mode: erlang -*-
|
||||||
|
%% .app.src.script
|
||||||
|
|
||||||
|
Config = case os:getenv("EMQX_DESC") of
|
||||||
|
false -> CONFIG; % env var not defined
|
||||||
|
[] -> CONFIG; % env var set to empty string
|
||||||
|
Desc ->
|
||||||
|
[begin
|
||||||
|
AppConf0 = lists:keystore(description, 1, AppConf, {description, Desc}),
|
||||||
|
{application, App, AppConf0}
|
||||||
|
end || Conf = {application, App, AppConf} <- CONFIG]
|
||||||
|
end,
|
||||||
|
|
||||||
|
RemoveLeadingV =
|
||||||
|
fun(Tag) ->
|
||||||
|
case re:run(Tag, "v\[0-9\]+\.\[0-9\]+\.*") of
|
||||||
|
nomatch ->
|
||||||
|
Tag;
|
||||||
|
{match, _} ->
|
||||||
|
%% if it is a version number prefixed by 'v' then remove the 'v'
|
||||||
|
"v" ++ Vsn = Tag,
|
||||||
|
Vsn
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
case os:getenv("EMQX_DEPS_DEFAULT_VSN") of
|
||||||
|
false -> Config; % env var not defined
|
||||||
|
[] -> Config; % env var set to empty string
|
||||||
|
Tag ->
|
||||||
|
[begin
|
||||||
|
AppConf0 = lists:keystore(vsn, 1, AppConf, {vsn, RemoveLeadingV(Tag)}),
|
||||||
|
{application, App, AppConf0}
|
||||||
|
end || Conf = {application, App, AppConf} <- Config]
|
||||||
|
end.
|
|
@ -256,7 +256,7 @@ aggre(Routes) ->
|
||||||
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
|
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
|
||||||
-> emqx_types:deliver_result()).
|
-> emqx_types:deliver_result()).
|
||||||
forward(Node, To, Delivery, async) ->
|
forward(Node, To, Delivery, async) ->
|
||||||
case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of
|
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
{badrpc, Reason} ->
|
{badrpc, Reason} ->
|
||||||
?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
|
?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
|
||||||
|
@ -264,7 +264,7 @@ forward(Node, To, Delivery, async) ->
|
||||||
end;
|
end;
|
||||||
|
|
||||||
forward(Node, To, Delivery, sync) ->
|
forward(Node, To, Delivery, sync) ->
|
||||||
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
|
case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||||
{badrpc, Reason} ->
|
{badrpc, Reason} ->
|
||||||
?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
|
?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
|
||||||
{error, badrpc};
|
{error, badrpc};
|
||||||
|
|
|
@ -149,7 +149,14 @@ call(CPid, Req) ->
|
||||||
|
|
||||||
init({Transport, RawSocket, Options}) ->
|
init({Transport, RawSocket, Options}) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
{ok, Socket} = Transport:wait(RawSocket),
|
case Transport:wait(RawSocket) of
|
||||||
|
{ok, Socket} ->
|
||||||
|
do_init(Transport, Socket, Options);
|
||||||
|
{error, Reason} ->
|
||||||
|
?LOG(warning, "connection failed to establish: ~p", [Reason])
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_init(Transport, Socket, Options) ->
|
||||||
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
|
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
|
||||||
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
|
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
|
||||||
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
|
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
|
||||||
|
@ -352,11 +359,11 @@ handle(info, {timeout, Timer, emit_stats},
|
||||||
{keep_state, NState#state{gc_state = GcState1}, hibernate};
|
{keep_state, NState#state{gc_state = GcState1}, hibernate};
|
||||||
{shutdown, Reason} ->
|
{shutdown, Reason} ->
|
||||||
?LOG(error, "Shutdown exceptionally due to ~p", [Reason]),
|
?LOG(error, "Shutdown exceptionally due to ~p", [Reason]),
|
||||||
shutdown(Reason, NState)
|
self() ! {shutdown, Reason}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle(info, {shutdown, discard, {ClientId, ByPid}}, State) ->
|
handle(info, {shutdown, discard, {ClientId, ByPid}}, State) ->
|
||||||
?LOG(error, "Discarded by ~s:~p", [ClientId, ByPid]),
|
?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]),
|
||||||
shutdown(discard, State);
|
shutdown(discard, State);
|
||||||
|
|
||||||
handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) ->
|
handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) ->
|
||||||
|
|
|
@ -44,6 +44,8 @@
|
||||||
|
|
||||||
-export([lookup_conn_pid/1]).
|
-export([lookup_conn_pid/1]).
|
||||||
|
|
||||||
|
-export([max_client_size/0]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([ init/1
|
-export([ init/1
|
||||||
, handle_call/3
|
, handle_call/3
|
||||||
|
@ -148,6 +150,9 @@ lookup_conn_pid(ClientId) when is_binary(ClientId) ->
|
||||||
notify(Msg) ->
|
notify(Msg) ->
|
||||||
gen_server:cast(?CM, {notify, Msg}).
|
gen_server:cast(?CM, {notify, Msg}).
|
||||||
|
|
||||||
|
max_client_size() ->
|
||||||
|
ets:info(?CONN_TAB, size).
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
|
|
|
@ -115,7 +115,7 @@ run_fold(HookPoint, Args, Acc) ->
|
||||||
|
|
||||||
|
|
||||||
do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
|
do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
|
||||||
case filter_passed(Filter, Args) andalso execute(Action, Args) of
|
case filter_passed(Filter, Args) andalso safe_execute(Action, Args) of
|
||||||
%% stop the hook chain and return
|
%% stop the hook chain and return
|
||||||
stop -> ok;
|
stop -> ok;
|
||||||
%% continue the hook chain, in following cases:
|
%% continue the hook chain, in following cases:
|
||||||
|
@ -128,7 +128,7 @@ do_run([], _Args) ->
|
||||||
|
|
||||||
do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
|
do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
|
||||||
Args1 = Args ++ [Acc],
|
Args1 = Args ++ [Acc],
|
||||||
case filter_passed(Filter, Args1) andalso execute(Action, Args1) of
|
case filter_passed(Filter, Args1) andalso safe_execute(Action, Args1) of
|
||||||
%% stop the hook chain
|
%% stop the hook chain
|
||||||
stop -> Acc;
|
stop -> Acc;
|
||||||
%% stop the hook chain with NewAcc
|
%% stop the hook chain with NewAcc
|
||||||
|
@ -148,6 +148,15 @@ filter_passed(undefined, _Args) -> true;
|
||||||
filter_passed(Filter, Args) ->
|
filter_passed(Filter, Args) ->
|
||||||
execute(Filter, Args).
|
execute(Filter, Args).
|
||||||
|
|
||||||
|
safe_execute(Fun, Args) ->
|
||||||
|
try execute(Fun, Args) of
|
||||||
|
Result -> Result
|
||||||
|
catch
|
||||||
|
_:Reason:Stacktrace ->
|
||||||
|
?LOG(error, "Failed to execute ~p(~p): ~p", [Fun, Args, {Reason, Stacktrace}]),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%% @doc execute a function.
|
%% @doc execute a function.
|
||||||
execute(Fun, Args) when is_function(Fun) ->
|
execute(Fun, Args) when is_function(Fun) ->
|
||||||
erlang:apply(Fun, Args);
|
erlang:apply(Fun, Args);
|
||||||
|
|
|
@ -30,8 +30,7 @@ init([]) ->
|
||||||
child_spec(emqx_stats, worker),
|
child_spec(emqx_stats, worker),
|
||||||
child_spec(emqx_metrics, worker),
|
child_spec(emqx_metrics, worker),
|
||||||
child_spec(emqx_ctl, worker),
|
child_spec(emqx_ctl, worker),
|
||||||
child_spec(emqx_zone, worker),
|
child_spec(emqx_zone, worker)]}}.
|
||||||
child_spec(emqx_tracer, worker)]}}.
|
|
||||||
|
|
||||||
child_spec(M, worker) ->
|
child_spec(M, worker) ->
|
||||||
#{id => M,
|
#{id => M,
|
||||||
|
|
|
@ -75,7 +75,7 @@ format(#{level:=Level,msg:=Msg0,meta:=Meta},Config0)
|
||||||
end,
|
end,
|
||||||
Config#{chars_limit=>Size}
|
Config#{chars_limit=>Size}
|
||||||
end,
|
end,
|
||||||
string:trim(format_msg(Msg0,Meta,Config1));
|
format_msg(Msg0,Meta,Config1);
|
||||||
true ->
|
true ->
|
||||||
""
|
""
|
||||||
end,
|
end,
|
||||||
|
@ -134,7 +134,7 @@ to_string(X,_) when is_list(X) ->
|
||||||
_ -> io_lib:format(?FormatP,[X])
|
_ -> io_lib:format(?FormatP,[X])
|
||||||
end;
|
end;
|
||||||
to_string(X,_) ->
|
to_string(X,_) ->
|
||||||
io_lib:format("~s",[X]).
|
io_lib:format(?FormatP,[X]).
|
||||||
|
|
||||||
printable_list([]) ->
|
printable_list([]) ->
|
||||||
false;
|
false;
|
||||||
|
@ -190,14 +190,14 @@ do_format_msg({Format0,Args},Depth,Opts) ->
|
||||||
Format = reformat(Format1, Depth),
|
Format = reformat(Format1, Depth),
|
||||||
io_lib:build_text(Format,Opts)
|
io_lib:build_text(Format,Opts)
|
||||||
catch C:R:S ->
|
catch C:R:S ->
|
||||||
FormatError = "FORMAT ERROR: ~0tp - ~0tp",
|
FormatError = "FORMAT ERROR: ~0tp - ~0tp",
|
||||||
case Format0 of
|
case Format0 of
|
||||||
FormatError ->
|
FormatError ->
|
||||||
%% already been here - avoid failing cyclically
|
%% already been here - avoid failing cyclically
|
||||||
erlang:raise(C,R,S);
|
erlang:raise(C,R,S);
|
||||||
_ ->
|
_ ->
|
||||||
format_msg({FormatError,[Format0,Args]},Depth,Opts)
|
do_format_msg({FormatError,[Format0,Args]},Depth,Opts)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
reformat(Format,unlimited) ->
|
reformat(Format,unlimited) ->
|
||||||
|
|
|
@ -294,12 +294,12 @@ do_inc_recv(?PACKET(?DISCONNECT)) ->
|
||||||
do_inc_recv(?PACKET(?AUTH)) ->
|
do_inc_recv(?PACKET(?AUTH)) ->
|
||||||
inc('packets.auth.received');
|
inc('packets.auth.received');
|
||||||
do_inc_recv(_Packet) ->
|
do_inc_recv(_Packet) ->
|
||||||
ignore.
|
ok.
|
||||||
|
|
||||||
%% @doc Inc packets sent. Will not count $SYS PUBLISH.
|
%% @doc Inc packets sent. Will not count $SYS PUBLISH.
|
||||||
-spec(inc_sent(emqx_mqtt_types:packet()) -> ok | ignore).
|
-spec(inc_sent(emqx_mqtt_types:packet()) -> ok).
|
||||||
inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
|
inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
|
||||||
ignore;
|
ok;
|
||||||
inc_sent(Packet) ->
|
inc_sent(Packet) ->
|
||||||
inc('packets.sent'),
|
inc('packets.sent'),
|
||||||
do_inc_sent(Packet).
|
do_inc_sent(Packet).
|
||||||
|
@ -341,7 +341,7 @@ do_inc_sent(?PACKET(?DISCONNECT)) ->
|
||||||
do_inc_sent(?PACKET(?AUTH)) ->
|
do_inc_sent(?PACKET(?AUTH)) ->
|
||||||
inc('packets.auth.sent');
|
inc('packets.auth.sent');
|
||||||
do_inc_sent(_Packet) ->
|
do_inc_sent(_Packet) ->
|
||||||
ignore.
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -356,7 +356,7 @@ init([]) ->
|
||||||
% Store reserved indices
|
% Store reserved indices
|
||||||
lists:foreach(fun({Type, Name}) ->
|
lists:foreach(fun({Type, Name}) ->
|
||||||
Idx = reserved_idx(Name),
|
Idx = reserved_idx(Name),
|
||||||
Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)},
|
Metric = #metric{name = Name, type = Type, idx = Idx},
|
||||||
true = ets:insert(?TAB, Metric),
|
true = ets:insert(?TAB, Metric),
|
||||||
ok = counters:put(CRef, Idx, 0)
|
ok = counters:put(CRef, Idx, 0)
|
||||||
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS),
|
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS),
|
||||||
|
|
|
@ -15,9 +15,7 @@
|
||||||
-module(emqx_mountpoint).
|
-module(emqx_mountpoint).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("logger.hrl").
|
-include("types.hrl").
|
||||||
|
|
||||||
-logger_header("[Mountpoint]").
|
|
||||||
|
|
||||||
-export([ mount/2
|
-export([ mount/2
|
||||||
, unmount/2
|
, unmount/2
|
||||||
|
@ -32,30 +30,34 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
mount(undefined, Any) ->
|
mount(undefined, Any) ->
|
||||||
Any;
|
Any;
|
||||||
|
mount(MountPoint, Topic) when is_binary(Topic) ->
|
||||||
|
prefix(MountPoint, Topic);
|
||||||
mount(MountPoint, Msg = #message{topic = Topic}) ->
|
mount(MountPoint, Msg = #message{topic = Topic}) ->
|
||||||
Msg#message{topic = <<MountPoint/binary, Topic/binary>>};
|
Msg#message{topic = prefix(MountPoint, Topic)};
|
||||||
|
|
||||||
mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
|
mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
|
||||||
[{<<MountPoint/binary, Topic/binary>>, SubOpts} || {Topic, SubOpts} <- TopicFilters].
|
[{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].
|
||||||
|
|
||||||
unmount(undefined, Msg) ->
|
unmount(undefined, Any) ->
|
||||||
Msg;
|
Any;
|
||||||
|
unmount(MountPoint, Topic) when is_binary(Topic) ->
|
||||||
|
case string:prefix(Topic, MountPoint) of
|
||||||
|
nomatch -> Topic;
|
||||||
|
Topic1 -> Topic1
|
||||||
|
end;
|
||||||
unmount(MountPoint, Msg = #message{topic = Topic}) ->
|
unmount(MountPoint, Msg = #message{topic = Topic}) ->
|
||||||
try split_binary(Topic, byte_size(MountPoint)) of
|
case string:prefix(Topic, MountPoint) of
|
||||||
{MountPoint, Topic1} -> Msg#message{topic = Topic1}
|
nomatch -> Msg;
|
||||||
catch
|
Topic1 -> Msg#message{topic = Topic1}
|
||||||
_Error:Reason ->
|
|
||||||
?LOG(error, "Unmount error : ~p", [Reason]),
|
|
||||||
Msg
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec(replvar(maybe(mountpoint()), map()) -> maybe(mountpoint())).
|
||||||
replvar(undefined, _Vars) ->
|
replvar(undefined, _Vars) ->
|
||||||
undefined;
|
undefined;
|
||||||
replvar(MountPoint, #{client_id := ClientId, username := Username}) ->
|
replvar(MountPoint, #{client_id := ClientId, username := Username}) ->
|
||||||
lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
|
lists:foldl(fun feed_var/2, MountPoint,
|
||||||
|
[{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
|
||||||
|
|
||||||
feed_var({<<"%c">>, ClientId}, MountPoint) ->
|
feed_var({<<"%c">>, ClientId}, MountPoint) ->
|
||||||
emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
|
emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
|
||||||
|
@ -64,3 +66,5 @@ feed_var({<<"%u">>, undefined}, MountPoint) ->
|
||||||
feed_var({<<"%u">>, Username}, MountPoint) ->
|
feed_var({<<"%u">>, Username}, MountPoint) ->
|
||||||
emqx_topic:feed_var(<<"%u">>, Username, MountPoint).
|
emqx_topic:feed_var(<<"%u">>, Username, MountPoint).
|
||||||
|
|
||||||
|
prefix(MountPoint, Topic) ->
|
||||||
|
<<MountPoint/binary, Topic/binary>>.
|
|
@ -126,31 +126,13 @@ default_caps() ->
|
||||||
?DEFAULT_CAPS.
|
?DEFAULT_CAPS.
|
||||||
|
|
||||||
get_caps(Zone, publish) ->
|
get_caps(Zone, publish) ->
|
||||||
with_env(Zone, '$mqtt_pub_caps',
|
filter_caps(?PUBCAP_KEYS, get_caps(Zone));
|
||||||
fun() ->
|
|
||||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone))
|
|
||||||
end);
|
|
||||||
|
|
||||||
get_caps(Zone, subscribe) ->
|
get_caps(Zone, subscribe) ->
|
||||||
with_env(Zone, '$mqtt_sub_caps',
|
filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
|
||||||
fun() ->
|
|
||||||
filter_caps(?SUBCAP_KEYS, get_caps(Zone))
|
|
||||||
end).
|
|
||||||
|
|
||||||
get_caps(Zone) ->
|
get_caps(Zone) ->
|
||||||
with_env(Zone, '$mqtt_caps',
|
maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} || {Cap, Def} <- ?DEFAULT_CAPS]).
|
||||||
fun() ->
|
|
||||||
maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)}
|
|
||||||
|| {Cap, Def} <- ?DEFAULT_CAPS])
|
|
||||||
end).
|
|
||||||
|
|
||||||
filter_caps(Keys, Caps) ->
|
filter_caps(Keys, Caps) ->
|
||||||
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
|
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
|
||||||
|
|
||||||
with_env(Zone, Key, InitFun) ->
|
|
||||||
case emqx_zone:get_env(Zone, Key) of
|
|
||||||
undefined -> Caps = InitFun(),
|
|
||||||
ok = emqx_zone:set_env(Zone, Key, Caps),
|
|
||||||
Caps;
|
|
||||||
ZoneCaps -> ZoneCaps
|
|
||||||
end.
|
|
||||||
|
|
|
@ -166,4 +166,7 @@ call(Req) ->
|
||||||
gen_server:call(?OS_MON, Req, infinity).
|
gen_server:call(?OS_MON, Req, infinity).
|
||||||
|
|
||||||
ensure_check_timer(State = #{cpu_check_interval := Interval}) ->
|
ensure_check_timer(State = #{cpu_check_interval := Interval}) ->
|
||||||
State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}.
|
case erlang:system_info(system_architecture) of
|
||||||
|
"x86_64-pc-linux-musl" -> State;
|
||||||
|
_ -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}
|
||||||
|
end.
|
|
@ -162,10 +162,11 @@ will_msg(#mqtt_packet_connect{client_id = ClientId,
|
||||||
will_qos = QoS,
|
will_qos = QoS,
|
||||||
will_topic = Topic,
|
will_topic = Topic,
|
||||||
will_props = Properties,
|
will_props = Properties,
|
||||||
will_payload = Payload}) ->
|
will_payload = Payload,
|
||||||
|
proto_ver = ProtoVer}) ->
|
||||||
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
|
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
|
||||||
Msg#message{flags = #{dup => false, retain => Retain},
|
Msg#message{flags = #{dup => false, retain => Retain},
|
||||||
headers = merge_props(#{username => Username}, Properties)}.
|
headers = merge_props(#{username => Username, proto_ver => ProtoVer}, Properties)}.
|
||||||
|
|
||||||
merge_props(Headers, undefined) ->
|
merge_props(Headers, undefined) ->
|
||||||
Headers;
|
Headers;
|
||||||
|
|
|
@ -260,10 +260,22 @@ set_protover(_Packet, PState) ->
|
||||||
received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
|
received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
|
||||||
{error, proto_not_connected, PState};
|
{error, proto_not_connected, PState};
|
||||||
|
|
||||||
|
received(Packet = ?PACKET(?CONNECT), PState = #pstate{connected = false}) ->
|
||||||
|
case check_max_clients() of
|
||||||
|
true ->
|
||||||
|
?LOG(error, "Connection rejected due to max clients limitation"),
|
||||||
|
connack({?RC_QUOTA_EXCEEDED, PState#pstate{credentials = credentials(PState)}});
|
||||||
|
false ->
|
||||||
|
do_received(Packet, PState)
|
||||||
|
end;
|
||||||
|
|
||||||
received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
|
received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
|
||||||
{error, proto_unexpected_connect, PState};
|
{error, proto_unexpected_connect, PState};
|
||||||
|
|
||||||
received(Packet = ?PACKET(Type), PState) ->
|
received(Packet, PState) ->
|
||||||
|
do_received(Packet, PState).
|
||||||
|
|
||||||
|
do_received(Packet = ?PACKET(Type), PState) ->
|
||||||
trace(recv, Packet),
|
trace(recv, Packet),
|
||||||
PState1 = set_protover(Packet, PState),
|
PState1 = set_protover(Packet, PState),
|
||||||
try emqx_packet:validate(Packet) of
|
try emqx_packet:validate(Packet) of
|
||||||
|
@ -562,10 +574,10 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Creden
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
|
do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
|
||||||
PState = #pstate{session = SPid, credentials = Credentials}) ->
|
PState = #pstate{session = SPid, credentials = Credentials, proto_ver = ProtoVer}) ->
|
||||||
Msg = emqx_mountpoint:mount(mountpoint(Credentials),
|
Msg = emqx_mountpoint:mount(mountpoint(Credentials),
|
||||||
emqx_packet:to_message(Credentials, Packet)),
|
emqx_packet:to_message(Credentials, Packet)),
|
||||||
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState).
|
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, emqx_message:set_header(proto_ver, ProtoVer, Msg))), PState).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Puback -> Client
|
%% Puback -> Client
|
||||||
|
@ -834,8 +846,8 @@ check_will_retain(#mqtt_packet_connect{will_retain = false, proto_ver = ?MQTT_PR
|
||||||
ok;
|
ok;
|
||||||
check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #pstate{zone = Zone}) ->
|
check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #pstate{zone = Zone}) ->
|
||||||
case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
|
case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
|
||||||
true -> {error, ?RC_RETAIN_NOT_SUPPORTED};
|
true -> ok;
|
||||||
false -> ok
|
false -> {error, ?RC_RETAIN_NOT_SUPPORTED}
|
||||||
end;
|
end;
|
||||||
check_will_retain(_Packet, _PState) ->
|
check_will_retain(_Packet, _PState) ->
|
||||||
ok.
|
ok.
|
||||||
|
@ -1027,7 +1039,7 @@ raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridg
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mountpoint(Credentials) ->
|
mountpoint(Credentials) ->
|
||||||
maps:get(mountpoint, Credentials, undefined).
|
emqx_mountpoint:replvar(maps:get(mountpoint, Credentials, undefined), Credentials).
|
||||||
|
|
||||||
do_check_banned(_EnableBan = true, Credentials) ->
|
do_check_banned(_EnableBan = true, Credentials) ->
|
||||||
case emqx_banned:check(Credentials) of
|
case emqx_banned:check(Credentials) of
|
||||||
|
@ -1048,3 +1060,8 @@ do_acl_check(Action, Credentials, Topic, AllowTerm, DenyTerm) ->
|
||||||
allow -> AllowTerm;
|
allow -> AllowTerm;
|
||||||
deny -> DenyTerm
|
deny -> DenyTerm
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
check_max_clients() ->
|
||||||
|
CurrentClientSize = emqx_cm:max_client_size(),
|
||||||
|
MaxClients = emqx_config:get_env(max_clients, 1024000),
|
||||||
|
CurrentClientSize >= MaxClients.
|
|
@ -16,24 +16,41 @@
|
||||||
-module(emqx_rpc).
|
-module(emqx_rpc).
|
||||||
|
|
||||||
-export([ call/4
|
-export([ call/4
|
||||||
|
, call/5
|
||||||
, cast/4
|
, cast/4
|
||||||
|
, cast/5
|
||||||
, multicall/4
|
, multicall/4
|
||||||
|
, multicall/5
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(RPC, gen_rpc).
|
-define(RPC, gen_rpc).
|
||||||
|
|
||||||
|
-define(DefaultClientNum, 1).
|
||||||
|
|
||||||
call(Node, Mod, Fun, Args) ->
|
call(Node, Mod, Fun, Args) ->
|
||||||
filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).
|
filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).
|
||||||
|
|
||||||
|
call(Key, Node, Mod, Fun, Args) ->
|
||||||
|
filter_result(?RPC:call(rpc_node({Key, Node}), Mod, Fun, Args)).
|
||||||
|
|
||||||
multicall(Nodes, Mod, Fun, Args) ->
|
multicall(Nodes, Mod, Fun, Args) ->
|
||||||
filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
|
filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
|
||||||
|
|
||||||
|
multicall(Key, Nodes, Mod, Fun, Args) ->
|
||||||
|
filter_result(?RPC:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)).
|
||||||
|
|
||||||
cast(Node, Mod, Fun, Args) ->
|
cast(Node, Mod, Fun, Args) ->
|
||||||
filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
|
filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
|
||||||
|
|
||||||
rpc_node(Node) ->
|
cast(Key, Node, Mod, Fun, Args) ->
|
||||||
{ok, ClientNum} = application:get_env(gen_rpc, tcp_client_num),
|
filter_result(?RPC:cast(rpc_node({Key, Node}), Mod, Fun, Args)).
|
||||||
{Node, rand:uniform(ClientNum)}.
|
|
||||||
|
rpc_node(Node) when is_atom(Node) ->
|
||||||
|
ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
|
||||||
|
{Node, rand:uniform(ClientNum)};
|
||||||
|
rpc_node({Key, Node}) when is_atom(Node) ->
|
||||||
|
ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
|
||||||
|
{Node, erlang:phash2(Key, ClientNum) + 1}.
|
||||||
|
|
||||||
rpc_nodes(Nodes) ->
|
rpc_nodes(Nodes) ->
|
||||||
rpc_nodes(Nodes, []).
|
rpc_nodes(Nodes, []).
|
||||||
|
@ -43,7 +60,6 @@ rpc_nodes([], Acc) ->
|
||||||
rpc_nodes([Node | Nodes], Acc) ->
|
rpc_nodes([Node | Nodes], Acc) ->
|
||||||
rpc_nodes(Nodes, [rpc_node(Node) | Acc]).
|
rpc_nodes(Nodes, [rpc_node(Node) | Acc]).
|
||||||
|
|
||||||
|
|
||||||
filter_result({Error, Reason})
|
filter_result({Error, Reason})
|
||||||
when Error =:= badrpc; Error =:= badtcp ->
|
when Error =:= badrpc; Error =:= badtcp ->
|
||||||
{badrpc, Reason};
|
{badrpc, Reason};
|
||||||
|
|
|
@ -401,13 +401,22 @@ handle_call(stats, _From, State) ->
|
||||||
reply(stats(State), State);
|
reply(stats(State), State);
|
||||||
|
|
||||||
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
|
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
|
||||||
?LOG(warning, "Discarded by ~p", [ByPid]),
|
?LOG(notice, "Discarded by ~p", [ByPid]),
|
||||||
{stop, {shutdown, discarded}, ok, State};
|
{stop, {shutdown, discarded}, ok, State};
|
||||||
|
|
||||||
handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
|
handle_call({discard, ByPid}, _From, State = #state{conn_pid = ConnPid, client_id = ClientId}) ->
|
||||||
?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid]),
|
?LOG(notice, "Conn ~p is discarded by ~p", [ConnPid, ByPid]),
|
||||||
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
|
case ClientId of
|
||||||
{stop, {shutdown, discarded}, ok, State};
|
<<"d:", _Sn/binary>> ->
|
||||||
|
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
|
||||||
|
{stop, {shutdown, discarded}, ok, State};
|
||||||
|
_ ->
|
||||||
|
Topic = <<"$SYS/kickout">>,
|
||||||
|
Msg = emqx_message:make(broker, 1, Topic, <<"The client has been kicked out">>),
|
||||||
|
{_, State1} = handle_dispatch([{Topic, Msg}], State),
|
||||||
|
erlang:send_after(5000, self(), {kicked, ByPid}),
|
||||||
|
{reply, ok, State1}
|
||||||
|
end;
|
||||||
|
|
||||||
%% PUBLISH: This is only to register packetId to session state.
|
%% PUBLISH: This is only to register packetId to session state.
|
||||||
%% The actual message dispatching should be done by the caller (e.g. connection) process.
|
%% The actual message dispatching should be done by the caller (e.g. connection) process.
|
||||||
|
@ -463,12 +472,17 @@ handle_call(Req, _From, State) ->
|
||||||
|
|
||||||
%% SUBSCRIBE:
|
%% SUBSCRIBE:
|
||||||
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
|
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
|
||||||
State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
|
State = #state{zone = Zone, client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
|
||||||
|
MaxSub = get_env(Zone, max_subscriptions, 0),
|
||||||
{ReasonCodes, Subscriptions1} =
|
{ReasonCodes, Subscriptions1} =
|
||||||
lists:foldr(
|
lists:foldr(
|
||||||
fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when
|
fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when ?IS_QOS(RC) ->
|
||||||
RC == ?QOS_0; RC == ?QOS_1; RC == ?QOS_2 ->
|
case exceeded_subscription_quota(MaxSub, SubMap) of
|
||||||
{[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)};
|
true ->
|
||||||
|
{[?RC_QUOTA_EXCEEDED|RcAcc], SubMap};
|
||||||
|
false ->
|
||||||
|
{[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)}
|
||||||
|
end;
|
||||||
({_Topic, #{rc := RC}}, {RcAcc, SubMap}) ->
|
({_Topic, #{rc := RC}}, {RcAcc, SubMap}) ->
|
||||||
{[RC|RcAcc], SubMap}
|
{[RC|RcAcc], SubMap}
|
||||||
end, {[], Subscriptions}, TopicFilters),
|
end, {[], Subscriptions}, TopicFilters),
|
||||||
|
@ -493,16 +507,23 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
||||||
noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1}));
|
noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1}));
|
||||||
|
|
||||||
%% PUBACK:
|
%% PUBACK:
|
||||||
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight,
|
||||||
noreply(
|
client_id = ClientId,
|
||||||
case emqx_inflight:contain(PacketId, Inflight) of
|
conn_pid = ConnPid}) ->
|
||||||
|
case emqx_inflight:contain(PacketId, Inflight) of
|
||||||
true ->
|
true ->
|
||||||
ensure_stats_timer(dequeue(acked(puback, PacketId, State)));
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
|
{value, {publish, {_, #message{topic = <<"$SYS/kickout">>}}, _Ts}} ->
|
||||||
|
ConnPid ! {shutdown, discard, {ClientId, undefined}},
|
||||||
|
{stop, {shutdown, discarded}, State};
|
||||||
|
_ ->
|
||||||
|
{noreply, ensure_stats_timer(dequeue(acked(puback, PacketId, State)))}
|
||||||
|
end;
|
||||||
false ->
|
false ->
|
||||||
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
|
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
|
||||||
ok = emqx_metrics:inc('packets.puback.missed'),
|
ok = emqx_metrics:inc('packets.puback.missed'),
|
||||||
State
|
{noreply, State}
|
||||||
end);
|
end;
|
||||||
|
|
||||||
%% PUBCOMP:
|
%% PUBCOMP:
|
||||||
handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
||||||
|
@ -603,7 +624,7 @@ handle_info({timeout, Timer, emit_stats},
|
||||||
{noreply, NewState#state{gc_state = GcState1}, hibernate};
|
{noreply, NewState#state{gc_state = GcState1}, hibernate};
|
||||||
{shutdown, Reason} ->
|
{shutdown, Reason} ->
|
||||||
?LOG(warning, "Shutdown exceptionally due to ~p", [Reason]),
|
?LOG(warning, "Shutdown exceptionally due to ~p", [Reason]),
|
||||||
shutdown(Reason, NewState)
|
self() ! {shutdown, Reason}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
|
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
|
||||||
|
@ -646,6 +667,13 @@ handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||||
[ConnPid, Pid, Reason]),
|
[ConnPid, Pid, Reason]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info({shutdown, Reason}, State) ->
|
||||||
|
shutdown(Reason, State);
|
||||||
|
|
||||||
|
handle_info({kicked, ByPid}, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
|
||||||
|
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
|
||||||
|
{stop, {shutdown, discarded}, State};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -877,12 +905,14 @@ process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #sta
|
||||||
true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State);
|
true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State);
|
||||||
false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State)
|
false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State)
|
||||||
end;
|
end;
|
||||||
process_subopts([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) ->
|
process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags, headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) ->
|
||||||
process_subopts(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, State);
|
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
|
||||||
process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags}, State = #state{}) ->
|
process_subopts([{rap, _}|Opts], Msg = #message{headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) ->
|
||||||
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, State);
|
process_subopts(Opts, Msg, Session);
|
||||||
process_subopts([{rap, _}|Opts], Msg, State) ->
|
process_subopts([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session = #session{}) ->
|
||||||
process_subopts(Opts, Msg, State);
|
process_subopts(Opts, Msg, Session);
|
||||||
|
process_subopts([{rap, _}|Opts], Msg = #message{flags = Flags}, Session) ->
|
||||||
|
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
|
||||||
process_subopts([{subid, SubId}|Opts], Msg, State) ->
|
process_subopts([{subid, SubId}|Opts], Msg, State) ->
|
||||||
process_subopts(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
|
process_subopts(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
|
||||||
|
|
||||||
|
@ -1020,9 +1050,19 @@ drain_q(Cnt, Msgs, Q) ->
|
||||||
case emqx_mqueue:out(Q) of
|
case emqx_mqueue:out(Q) of
|
||||||
{empty, _Q} -> {Msgs, Q};
|
{empty, _Q} -> {Msgs, Q};
|
||||||
{{value, Msg}, Q1} ->
|
{{value, Msg}, Q1} ->
|
||||||
drain_q(Cnt-1, [Msg|Msgs], Q1)
|
case emqx_message:is_expired(Msg) of
|
||||||
|
true ->
|
||||||
|
ok = emqx_metrics:inc('messages.expired'),
|
||||||
|
drain_q(Cnt, Msgs, Q1);
|
||||||
|
false ->
|
||||||
|
drain_q(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-compile({inline, [acc_cnt/2]}).
|
||||||
|
acc_cnt(#message{qos = ?QOS_0}, Cnt) -> Cnt;
|
||||||
|
acc_cnt(_Msg, Cnt) -> Cnt - 1.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Ensure timers
|
%% Ensure timers
|
||||||
|
|
||||||
|
@ -1130,3 +1170,9 @@ do_subscribe(ClientId, Username, Topic, SubOpts, SubMap) ->
|
||||||
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
|
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
|
||||||
maps:put(Topic, SubOpts, SubMap)
|
maps:put(Topic, SubOpts, SubMap)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
exceeded_subscription_quota(0, _SubMap) ->
|
||||||
|
false;
|
||||||
|
exceeded_subscription_quota(Max, SubMap) ->
|
||||||
|
maps:size(SubMap) >= Max.
|
||||||
|
|
||||||
|
|
|
@ -117,7 +117,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
|
||||||
catch
|
catch
|
||||||
_:Error:_Stk ->
|
_:Error:_Stk ->
|
||||||
unregister_session(ClientId, SessPid),
|
unregister_session(ClientId, SessPid),
|
||||||
?LOG(warning, "Failed to discard ~p: ~p", [SessPid, Error])
|
?LOG(notice, "Failed to discard ~p: ~p", [SessPid, Error])
|
||||||
end
|
end
|
||||||
end, lookup_session_pids(ClientId)).
|
end, lookup_session_pids(ClientId)).
|
||||||
|
|
||||||
|
|
|
@ -14,8 +14,6 @@
|
||||||
|
|
||||||
-module(emqx_topic).
|
-module(emqx_topic).
|
||||||
|
|
||||||
-include("emqx_mqtt.hrl").
|
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([ match/2
|
-export([ match/2
|
||||||
, validate/1
|
, validate/1
|
||||||
|
@ -33,19 +31,23 @@
|
||||||
, parse/2
|
, parse/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export_type([ group/0
|
||||||
|
, topic/0
|
||||||
|
, word/0
|
||||||
|
, triple/0
|
||||||
|
]).
|
||||||
|
|
||||||
-type(group() :: binary()).
|
-type(group() :: binary()).
|
||||||
-type(topic() :: binary()).
|
-type(topic() :: binary()).
|
||||||
-type(word() :: '' | '+' | '#' | binary()).
|
-type(word() :: '' | '+' | '#' | binary()).
|
||||||
-type(words() :: list(word())).
|
-type(words() :: list(word())).
|
||||||
-opaque(triple() :: {root | binary(), word(), binary()}).
|
-opaque(triple() :: {root | binary(), word(), binary()}).
|
||||||
|
|
||||||
-export_type([group/0, topic/0, word/0, triple/0]).
|
|
||||||
|
|
||||||
-define(MAX_TOPIC_LEN, 4096).
|
-define(MAX_TOPIC_LEN, 4096).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Is wildcard topic?
|
%% @doc Is wildcard topic?
|
||||||
-spec(wildcard(topic() | words()) -> true | false).
|
-spec(wildcard(topic() | words()) -> true | false).
|
||||||
|
@ -60,7 +62,7 @@ wildcard(['+'|_]) ->
|
||||||
wildcard([_H|T]) ->
|
wildcard([_H|T]) ->
|
||||||
wildcard(T).
|
wildcard(T).
|
||||||
|
|
||||||
%% @doc Match Topic name with filter
|
%% @doc Match Topic name with filter.
|
||||||
-spec(match(Name, Filter) -> boolean() when
|
-spec(match(Name, Filter) -> boolean() when
|
||||||
Name :: topic() | words(),
|
Name :: topic() | words(),
|
||||||
Filter :: topic() | words()).
|
Filter :: topic() | words()).
|
||||||
|
@ -68,7 +70,7 @@ match(<<$$, _/binary>>, <<$+, _/binary>>) ->
|
||||||
false;
|
false;
|
||||||
match(<<$$, _/binary>>, <<$#, _/binary>>) ->
|
match(<<$$, _/binary>>, <<$#, _/binary>>) ->
|
||||||
false;
|
false;
|
||||||
match(Name, Filter) when is_binary(Name) and is_binary(Filter) ->
|
match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
|
||||||
match(words(Name), words(Filter));
|
match(words(Name), words(Filter));
|
||||||
match([], []) ->
|
match([], []) ->
|
||||||
true;
|
true;
|
||||||
|
@ -95,13 +97,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter ->
|
||||||
-spec(validate(name | filter, topic()) -> true).
|
-spec(validate(name | filter, topic()) -> true).
|
||||||
validate(_, <<>>) ->
|
validate(_, <<>>) ->
|
||||||
error(empty_topic);
|
error(empty_topic);
|
||||||
validate(_, Topic) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) ->
|
validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
|
||||||
error(topic_too_long);
|
error(topic_too_long);
|
||||||
validate(filter, Topic) when is_binary(Topic) ->
|
validate(filter, Topic) when is_binary(Topic) ->
|
||||||
validate2(words(Topic));
|
validate2(words(Topic));
|
||||||
validate(name, Topic) when is_binary(Topic) ->
|
validate(name, Topic) when is_binary(Topic) ->
|
||||||
Words = words(Topic),
|
Words = words(Topic),
|
||||||
validate2(Words) and (not wildcard(Words)).
|
validate2(Words)
|
||||||
|
andalso (not wildcard(Words))
|
||||||
|
orelse error(topic_name_error).
|
||||||
|
|
||||||
validate2([]) ->
|
validate2([]) ->
|
||||||
true;
|
true;
|
||||||
|
@ -123,7 +127,7 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
|
||||||
validate3(<<_/utf8, Rest/binary>>) ->
|
validate3(<<_/utf8, Rest/binary>>) ->
|
||||||
validate3(Rest).
|
validate3(Rest).
|
||||||
|
|
||||||
%% @doc Topic to triples
|
%% @doc Topic to triples.
|
||||||
-spec(triples(topic()) -> list(triple())).
|
-spec(triples(topic()) -> list(triple())).
|
||||||
triples(Topic) when is_binary(Topic) ->
|
triples(Topic) when is_binary(Topic) ->
|
||||||
triples(words(Topic), root, []).
|
triples(words(Topic), root, []).
|
||||||
|
@ -206,27 +210,29 @@ join(Words) ->
|
||||||
end, {true, <<>>}, [bin(W) || W <- Words]),
|
end, {true, <<>>}, [bin(W) || W <- Words]),
|
||||||
Bin.
|
Bin.
|
||||||
|
|
||||||
-spec(parse(topic()) -> {topic(), #{}}).
|
-spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}).
|
||||||
parse(Topic) when is_binary(Topic) ->
|
parse(TopicFilter) when is_binary(TopicFilter) ->
|
||||||
parse(Topic, #{}).
|
parse(TopicFilter, #{});
|
||||||
|
parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
|
||||||
|
parse(TopicFilter, Options).
|
||||||
|
|
||||||
parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) ->
|
-spec(parse(topic(), map()) -> {topic(), map()}).
|
||||||
error({invalid_topic, Topic});
|
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
|
||||||
parse(Topic = <<?SHARE, "/", _/binary>>, #{share := _Group}) ->
|
error({invalid_topic_filter, TopicFilter});
|
||||||
error({invalid_topic, Topic});
|
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
|
||||||
parse(<<"$queue/", Topic1/binary>>, Options) ->
|
error({invalid_topic_filter, TopicFilter});
|
||||||
parse(Topic1, maps:put(share, <<"$queue">>, Options));
|
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
|
||||||
parse(Topic = <<?SHARE, "/", Topic1/binary>>, Options) ->
|
parse(TopicFilter, Options#{share => <<"$queue">>});
|
||||||
case binary:split(Topic1, <<"/">>) of
|
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
|
||||||
[<<>>] -> error({invalid_topic, Topic});
|
case binary:split(Rest, <<"/">>) of
|
||||||
[_] -> error({invalid_topic, Topic});
|
[_Any] -> error({invalid_topic_filter, TopicFilter});
|
||||||
[Group, Topic2] ->
|
[ShareName, Filter] ->
|
||||||
case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of
|
case binary:match(ShareName, [<<"+">>, <<"#">>]) of
|
||||||
nomatch -> {Topic2, maps:put(share, Group, Options)};
|
nomatch -> parse(Filter, Options#{share => ShareName});
|
||||||
_ -> error({invalid_topic, Topic})
|
_ -> error({invalid_topic_filter, TopicFilter})
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
parse(Topic, Options = #{qos := QoS}) ->
|
parse(TopicFilter, Options = #{qos := QoS}) ->
|
||||||
{Topic, Options#{rc => QoS}};
|
{TopicFilter, Options#{rc => QoS}};
|
||||||
parse(Topic, Options) ->
|
parse(TopicFilter, Options) ->
|
||||||
{Topic, Options}.
|
{TopicFilter, Options}.
|
|
@ -14,34 +14,19 @@
|
||||||
|
|
||||||
-module(emqx_tracer).
|
-module(emqx_tracer).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
|
||||||
-logger_header("[Tracer]").
|
-logger_header("[Tracer]").
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([start_link/0]).
|
|
||||||
|
|
||||||
-export([ trace/2
|
-export([ trace/2
|
||||||
, start_trace/3
|
, start_trace/3
|
||||||
, lookup_traces/0
|
, lookup_traces/0
|
||||||
, stop_trace/1
|
, stop_trace/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
-type(trace_who() :: {client_id | topic, binary() | list()}).
|
||||||
-export([ init/1
|
|
||||||
, handle_call/3
|
|
||||||
, handle_cast/2
|
|
||||||
, handle_info/2
|
|
||||||
, terminate/2
|
|
||||||
, code_change/3
|
|
||||||
]).
|
|
||||||
|
|
||||||
-record(state, {traces}).
|
|
||||||
|
|
||||||
-type(trace_who() :: {client_id | topic, binary()}).
|
|
||||||
|
|
||||||
-define(TRACER, ?MODULE).
|
-define(TRACER, ?MODULE).
|
||||||
-define(FORMAT, {emqx_logger_formatter,
|
-define(FORMAT, {emqx_logger_formatter,
|
||||||
|
@ -55,120 +40,100 @@
|
||||||
[peername," "],
|
[peername," "],
|
||||||
[]}]},
|
[]}]},
|
||||||
msg,"\n"]}}).
|
msg,"\n"]}}).
|
||||||
|
-define(TOPIC_TRACE_ID(T), "trace_topic_"++T).
|
||||||
|
-define(CLIENT_TRACE_ID(C), "trace_clientid_"++C).
|
||||||
|
-define(TOPIC_TRACE(T), {topic,T}).
|
||||||
|
-define(CLIENT_TRACE(C), {client_id,C}).
|
||||||
|
|
||||||
|
-define(is_log_level(L),
|
||||||
|
L =:= emergency orelse
|
||||||
|
L =:= alert orelse
|
||||||
|
L =:= critical orelse
|
||||||
|
L =:= error orelse
|
||||||
|
L =:= warning orelse
|
||||||
|
L =:= notice orelse
|
||||||
|
L =:= info orelse
|
||||||
|
L =:= debug).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link({local, ?TRACER}, ?MODULE, [], []).
|
|
||||||
|
|
||||||
trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
|
trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
|
||||||
%% Dont' trace '$SYS' publish
|
%% Do not trace '$SYS' publish
|
||||||
ignore;
|
ignore;
|
||||||
trace(publish, #message{from = From, topic = Topic, payload = Payload})
|
trace(publish, #message{from = From, topic = Topic, payload = Payload})
|
||||||
when is_binary(From); is_atom(From) ->
|
when is_binary(From); is_atom(From) ->
|
||||||
emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]).
|
emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~p", [Topic, Payload]).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Start/Stop trace
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc Start to trace client_id or topic.
|
%% @doc Start to trace client_id or topic.
|
||||||
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
|
-spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}).
|
||||||
start_trace({client_id, ClientId}, Level, LogFile) ->
|
start_trace(Who, all, LogFile) ->
|
||||||
do_start_trace({client_id, ClientId}, Level, LogFile);
|
start_trace(Who, debug, LogFile);
|
||||||
start_trace({topic, Topic}, Level, LogFile) ->
|
start_trace(Who, Level, LogFile) ->
|
||||||
do_start_trace({topic, Topic}, Level, LogFile).
|
case ?is_log_level(Level) of
|
||||||
|
true ->
|
||||||
do_start_trace(Who, Level, LogFile) ->
|
#{level := PrimaryLevel} = logger:get_primary_config(),
|
||||||
#{level := PrimaryLevel} = logger:get_primary_config(),
|
try logger:compare_levels(Level, PrimaryLevel) of
|
||||||
try logger:compare_levels(log_level(Level), PrimaryLevel) of
|
lt ->
|
||||||
lt ->
|
{error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
|
||||||
{error, io_lib:format("Cannot trace at a log level (~s) lower than the primary log level (~s)", [Level, PrimaryLevel])};
|
_GtOrEq ->
|
||||||
_GtOrEq ->
|
install_trace_handler(Who, Level, LogFile)
|
||||||
gen_server:call(?MODULE, {start_trace, Who, Level, LogFile}, 5000)
|
catch
|
||||||
catch
|
_:Error ->
|
||||||
_:Error ->
|
{error, Error}
|
||||||
{error, Error}
|
end;
|
||||||
|
false -> {error, {invalid_log_level, Level}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Stop tracing client_id or topic.
|
%% @doc Stop tracing client_id or topic.
|
||||||
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
|
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
|
||||||
stop_trace({client_id, ClientId}) ->
|
stop_trace(Who) ->
|
||||||
gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}});
|
uninstall_trance_handler(Who).
|
||||||
stop_trace({topic, Topic}) ->
|
|
||||||
gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
|
|
||||||
|
|
||||||
%% @doc Lookup all traces
|
%% @doc Lookup all traces
|
||||||
-spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
|
-spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
|
||||||
lookup_traces() ->
|
lookup_traces() ->
|
||||||
gen_server:call(?TRACER, lookup_traces).
|
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers()).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
install_trace_handler(Who, Level, LogFile) ->
|
||||||
%% gen_server callbacks
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
init([]) ->
|
|
||||||
{ok, #state{traces = #{}}}.
|
|
||||||
|
|
||||||
handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = Traces}) ->
|
|
||||||
case logger:add_handler(handler_id(Who), logger_disk_log_h,
|
case logger:add_handler(handler_id(Who), logger_disk_log_h,
|
||||||
#{level => Level,
|
#{level => Level,
|
||||||
formatter => ?FORMAT,
|
formatter => ?FORMAT,
|
||||||
filesync_repeat_interval => no_repeat,
|
filesync_repeat_interval => no_repeat,
|
||||||
config => #{type => halt, file => LogFile},
|
config => #{type => halt, file => LogFile},
|
||||||
filter_default => stop,
|
filter_default => stop,
|
||||||
filters => [{meta_key_filter,
|
filters => [{meta_key_filter,
|
||||||
{fun filter_by_meta_key/2, Who} }]}) of
|
{fun filter_by_meta_key/2, Who}}]})
|
||||||
|
of
|
||||||
ok ->
|
ok ->
|
||||||
?LOG(info, "Start trace for ~p", [Who]),
|
?LOG(info, "Start trace for ~p", [Who]);
|
||||||
{reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}};
|
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
|
?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
|
||||||
{reply, {error, Reason}, State}
|
{error, Reason}
|
||||||
end;
|
end.
|
||||||
|
|
||||||
handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
|
uninstall_trance_handler(Who) ->
|
||||||
case maps:find(Who, Traces) of
|
case logger:remove_handler(handler_id(Who)) of
|
||||||
{ok, _LogFile} ->
|
ok ->
|
||||||
case logger:remove_handler(handler_id(Who)) of
|
?LOG(info, "Stop trace for ~p", [Who]);
|
||||||
ok ->
|
{error, Reason} ->
|
||||||
?LOG(info, "Stop trace for ~p", [Who]);
|
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]),
|
||||||
{error, Reason} ->
|
{error, Reason}
|
||||||
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason])
|
end.
|
||||||
end,
|
|
||||||
{reply, ok, State#state{traces = maps:remove(Who, Traces)}};
|
|
||||||
error ->
|
|
||||||
{reply, {error, not_found}, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_call(lookup_traces, _From, State = #state{traces = Traces}) ->
|
filter_traces({Id, Level, Dst}, Acc) ->
|
||||||
{reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State};
|
case atom_to_list(Id) of
|
||||||
|
?TOPIC_TRACE_ID(T)->
|
||||||
|
[{?TOPIC_TRACE(T), {Level,Dst}} | Acc];
|
||||||
|
?CLIENT_TRACE_ID(C) ->
|
||||||
|
[{?CLIENT_TRACE(C), {Level,Dst}} | Acc];
|
||||||
|
_ -> Acc
|
||||||
|
end.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handler_id(?TOPIC_TRACE(Topic)) ->
|
||||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
list_to_atom(?TOPIC_TRACE_ID(str(Topic)));
|
||||||
{reply, ignored, State}.
|
handler_id(?CLIENT_TRACE(ClientId)) ->
|
||||||
|
list_to_atom(?CLIENT_TRACE_ID(str(ClientId))).
|
||||||
handle_cast(Msg, State) ->
|
|
||||||
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
handler_id({topic, Topic}) ->
|
|
||||||
list_to_atom("topic_" ++ binary_to_list(Topic));
|
|
||||||
handler_id({client_id, ClientId}) ->
|
|
||||||
list_to_atom("clientid_" ++ binary_to_list(ClientId)).
|
|
||||||
|
|
||||||
filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
|
filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
|
||||||
case maps:find(MetaKey, Meta) of
|
case maps:find(MetaKey, Meta) of
|
||||||
|
@ -181,13 +146,6 @@ filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
|
||||||
_ -> ignore
|
_ -> ignore
|
||||||
end.
|
end.
|
||||||
|
|
||||||
log_level(emergency) -> emergency;
|
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
||||||
log_level(alert) -> alert;
|
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
||||||
log_level(critical) -> critical;
|
str(Str) when is_list(Str) -> Str.
|
||||||
log_level(error) -> error;
|
|
||||||
log_level(warning) -> warning;
|
|
||||||
log_level(notice) -> notice;
|
|
||||||
log_level(info) -> info;
|
|
||||||
log_level(debug) -> debug;
|
|
||||||
log_level(all) -> debug;
|
|
||||||
log_level(_) -> throw(invalid_log_level).
|
|
||||||
|
|
|
@ -70,6 +70,7 @@ receive_messages(Count, Msgs) ->
|
||||||
basic_test(_Config) ->
|
basic_test(_Config) ->
|
||||||
Topic = nth(1, ?TOPICS),
|
Topic = nth(1, ?TOPICS),
|
||||||
ct:print("Basic test starting"),
|
ct:print("Basic test starting"),
|
||||||
|
init_caps(),
|
||||||
{ok, C} = emqx_client:start_link(),
|
{ok, C} = emqx_client:start_link(),
|
||||||
{ok, _} = emqx_client:connect(C),
|
{ok, _} = emqx_client:connect(C),
|
||||||
{ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1),
|
{ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1),
|
||||||
|
@ -81,6 +82,7 @@ basic_test(_Config) ->
|
||||||
ok = emqx_client:disconnect(C).
|
ok = emqx_client:disconnect(C).
|
||||||
|
|
||||||
will_message_test(_Config) ->
|
will_message_test(_Config) ->
|
||||||
|
init_caps(),
|
||||||
{ok, C1} = emqx_client:start_link([{clean_start, true},
|
{ok, C1} = emqx_client:start_link([{clean_start, true},
|
||||||
{will_topic, nth(3, ?TOPICS)},
|
{will_topic, nth(3, ?TOPICS)},
|
||||||
{will_payload, <<"client disconnected">>},
|
{will_payload, <<"client disconnected">>},
|
||||||
|
@ -99,10 +101,10 @@ will_message_test(_Config) ->
|
||||||
ct:print("Will message test succeeded").
|
ct:print("Will message test succeeded").
|
||||||
|
|
||||||
offline_message_queueing_test(_) ->
|
offline_message_queueing_test(_) ->
|
||||||
|
init_caps(),
|
||||||
{ok, C1} = emqx_client:start_link([{clean_start, false},
|
{ok, C1} = emqx_client:start_link([{clean_start, false},
|
||||||
{client_id, <<"c1">>}]),
|
{client_id, <<"c1">>}]),
|
||||||
{ok, _} = emqx_client:connect(C1),
|
{ok, _} = emqx_client:connect(C1),
|
||||||
|
|
||||||
{ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
|
{ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
|
||||||
ok = emqx_client:disconnect(C1),
|
ok = emqx_client:disconnect(C1),
|
||||||
{ok, C2} = emqx_client:start_link([{clean_start, true},
|
{ok, C2} = emqx_client:start_link([{clean_start, true},
|
||||||
|
@ -123,6 +125,7 @@ offline_message_queueing_test(_) ->
|
||||||
?assertEqual(3, length(receive_messages(3))).
|
?assertEqual(3, length(receive_messages(3))).
|
||||||
|
|
||||||
overlapping_subscriptions_test(_) ->
|
overlapping_subscriptions_test(_) ->
|
||||||
|
init_caps(),
|
||||||
{ok, C} = emqx_client:start_link([]),
|
{ok, C} = emqx_client:start_link([]),
|
||||||
{ok, _} = emqx_client:connect(C),
|
{ok, _} = emqx_client:connect(C),
|
||||||
|
|
||||||
|
@ -163,6 +166,7 @@ overlapping_subscriptions_test(_) ->
|
||||||
|
|
||||||
redelivery_on_reconnect_test(_) ->
|
redelivery_on_reconnect_test(_) ->
|
||||||
ct:print("Redelivery on reconnect test starting"),
|
ct:print("Redelivery on reconnect test starting"),
|
||||||
|
init_caps(),
|
||||||
{ok, C1} = emqx_client:start_link([{clean_start, false},
|
{ok, C1} = emqx_client:start_link([{clean_start, false},
|
||||||
{client_id, <<"c">>}]),
|
{client_id, <<"c">>}]),
|
||||||
{ok, _} = emqx_client:connect(C1),
|
{ok, _} = emqx_client:connect(C1),
|
||||||
|
@ -194,6 +198,7 @@ redelivery_on_reconnect_test(_) ->
|
||||||
|
|
||||||
dollar_topics_test(_) ->
|
dollar_topics_test(_) ->
|
||||||
ct:print("$ topics test starting"),
|
ct:print("$ topics test starting"),
|
||||||
|
init_caps(),
|
||||||
{ok, C} = emqx_client:start_link([{clean_start, true},
|
{ok, C} = emqx_client:start_link([{clean_start, true},
|
||||||
{keepalive, 0}]),
|
{keepalive, 0}]),
|
||||||
{ok, _} = emqx_client:connect(C),
|
{ok, _} = emqx_client:connect(C),
|
||||||
|
@ -205,3 +210,13 @@ dollar_topics_test(_) ->
|
||||||
?assertEqual(0, length(receive_messages(1))),
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
ok = emqx_client:disconnect(C),
|
ok = emqx_client:disconnect(C),
|
||||||
ct:print("$ topics test succeeded").
|
ct:print("$ topics test succeeded").
|
||||||
|
|
||||||
|
init_caps() ->
|
||||||
|
Caps = #{max_qos_allowed => 2,
|
||||||
|
max_topic_levels => 0,
|
||||||
|
mqtt_shared_subscription => true,
|
||||||
|
mqtt_wildcard_subscription => true,
|
||||||
|
max_topic_alias => 0,
|
||||||
|
mqtt_retain_available => true},
|
||||||
|
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
|
||||||
|
timer:sleep(100).
|
|
@ -27,38 +27,19 @@ all() -> [t_get_set_caps, t_check_pub, t_check_sub].
|
||||||
|
|
||||||
t_get_set_caps(_) ->
|
t_get_set_caps(_) ->
|
||||||
{ok, _} = emqx_zone:start_link(),
|
{ok, _} = emqx_zone:start_link(),
|
||||||
Caps = #{
|
|
||||||
max_packet_size => ?MAX_PACKET_SIZE,
|
PubCaps = emqx_mqtt_caps:get_caps(external, publish),
|
||||||
max_clientid_len => ?MAX_CLIENTID_LEN,
|
|
||||||
max_topic_alias => 0,
|
|
||||||
max_topic_levels => 0,
|
|
||||||
max_qos_allowed => ?QOS_2,
|
|
||||||
mqtt_retain_available => true,
|
|
||||||
mqtt_shared_subscription => true,
|
|
||||||
mqtt_wildcard_subscription => true
|
|
||||||
},
|
|
||||||
Caps2 = Caps#{max_packet_size => 1048576},
|
|
||||||
case emqx_mqtt_caps:get_caps(zone) of
|
|
||||||
Caps -> ok;
|
|
||||||
Caps2 -> ok
|
|
||||||
end,
|
|
||||||
PubCaps = #{
|
|
||||||
max_qos_allowed => ?QOS_2,
|
|
||||||
mqtt_retain_available => true,
|
|
||||||
max_topic_alias => 0
|
|
||||||
},
|
|
||||||
PubCaps = emqx_mqtt_caps:get_caps(zone, publish),
|
|
||||||
NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1},
|
NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1},
|
||||||
emqx_zone:set_env(zone, '$mqtt_pub_caps', NewPubCaps),
|
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(NewPubCaps)],
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
NewPubCaps = emqx_mqtt_caps:get_caps(zone, publish),
|
NewPubCaps = emqx_mqtt_caps:get_caps(external, publish),
|
||||||
SubCaps = #{
|
|
||||||
max_topic_levels => 0,
|
SubCaps = emqx_mqtt_caps:get_caps(external, subscribe),
|
||||||
max_qos_allowed => ?QOS_2,
|
NewSubCaps = SubCaps#{max_topic_levels => 2},
|
||||||
mqtt_shared_subscription => true,
|
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(NewSubCaps)],
|
||||||
mqtt_wildcard_subscription => true
|
timer:sleep(100),
|
||||||
},
|
NewSubCaps = emqx_mqtt_caps:get_caps(external, subscribe),
|
||||||
SubCaps = emqx_mqtt_caps:get_caps(zone, subscribe),
|
|
||||||
emqx_zone:stop().
|
emqx_zone:stop().
|
||||||
|
|
||||||
t_check_pub(_) ->
|
t_check_pub(_) ->
|
||||||
|
@ -68,35 +49,34 @@ t_check_pub(_) ->
|
||||||
mqtt_retain_available => false,
|
mqtt_retain_available => false,
|
||||||
max_topic_alias => 4
|
max_topic_alias => 4
|
||||||
},
|
},
|
||||||
emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
|
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(PubCaps)],
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
ct:log("~p", [emqx_mqtt_caps:get_caps(zone, publish)]),
|
ct:log("~p", [emqx_mqtt_caps:get_caps(external, publish)]),
|
||||||
BadPubProps1 = #{
|
BadPubProps1 = #{
|
||||||
qos => ?QOS_2,
|
qos => ?QOS_2,
|
||||||
retain => false
|
retain => false
|
||||||
},
|
},
|
||||||
{error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps1),
|
{error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(external, BadPubProps1),
|
||||||
BadPubProps2 = #{
|
BadPubProps2 = #{
|
||||||
qos => ?QOS_1,
|
qos => ?QOS_1,
|
||||||
retain => true
|
retain => true
|
||||||
},
|
},
|
||||||
{error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps2),
|
{error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(external, BadPubProps2),
|
||||||
BadPubProps3 = #{
|
BadPubProps3 = #{
|
||||||
qos => ?QOS_1,
|
qos => ?QOS_1,
|
||||||
retain => false,
|
retain => false,
|
||||||
topic_alias => 5
|
topic_alias => 5
|
||||||
},
|
},
|
||||||
{error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(zone, BadPubProps3),
|
{error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(external, BadPubProps3),
|
||||||
PubProps = #{
|
PubProps = #{
|
||||||
qos => ?QOS_1,
|
qos => ?QOS_1,
|
||||||
retain => false
|
retain => false
|
||||||
},
|
},
|
||||||
ok = emqx_mqtt_caps:check_pub(zone, PubProps),
|
ok = emqx_mqtt_caps:check_pub(external, PubProps),
|
||||||
emqx_zone:stop().
|
emqx_zone:stop().
|
||||||
|
|
||||||
t_check_sub(_) ->
|
t_check_sub(_) ->
|
||||||
{ok, _} = emqx_zone:start_link(),
|
{ok, _} = emqx_zone:start_link(),
|
||||||
|
|
||||||
Opts = #{qos => ?QOS_2, share => true, rc => 0},
|
Opts = #{qos => ?QOS_2, share => true, rc => 0},
|
||||||
Caps = #{
|
Caps = #{
|
||||||
max_topic_levels => 0,
|
max_topic_levels => 0,
|
||||||
|
@ -104,6 +84,8 @@ t_check_sub(_) ->
|
||||||
mqtt_shared_subscription => true,
|
mqtt_shared_subscription => true,
|
||||||
mqtt_wildcard_subscription => true
|
mqtt_wildcard_subscription => true
|
||||||
},
|
},
|
||||||
|
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
|
||||||
|
timer:sleep(100),
|
||||||
|
|
||||||
ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]),
|
ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]),
|
||||||
ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]),
|
ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]),
|
||||||
|
@ -122,10 +104,10 @@ t_check_sub(_) ->
|
||||||
|
|
||||||
|
|
||||||
do_check_sub(TopicFilters, Topics) ->
|
do_check_sub(TopicFilters, Topics) ->
|
||||||
{ok, Topics} = emqx_mqtt_caps:check_sub(zone, TopicFilters),
|
{ok, Topics} = emqx_mqtt_caps:check_sub(external, TopicFilters),
|
||||||
ok.
|
ok.
|
||||||
do_check_sub(Caps, TopicFilters, Topics) ->
|
do_check_sub(Caps, TopicFilters, Topics) ->
|
||||||
emqx_zone:set_env(zone, '$mqtt_sub_caps', Caps),
|
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(Caps)],
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
{_, Topics} = emqx_mqtt_caps:check_sub(zone, TopicFilters),
|
{_, Topics} = emqx_mqtt_caps:check_sub(external, TopicFilters),
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -56,7 +56,7 @@ groups() ->
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
|
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
|
||||||
MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()),
|
MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()),
|
||||||
emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}),
|
[emqx_zone:set_env(external, Key, Val) ||{Key, Val} <- maps:to_list(MqttCaps#{max_topic_alias => 20})],
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
|
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
all() -> [ignore_loop, t_session_all].
|
all() -> [ignore_loop, t_session_all, t_message_expiry_interval_1, t_message_expiry_interval_2].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_ct_helpers:start_apps([]),
|
emqx_ct_helpers:start_apps([]),
|
||||||
|
@ -66,3 +66,63 @@ t_session_all(_) ->
|
||||||
timer:sleep(200),
|
timer:sleep(200),
|
||||||
[] = emqx:subscriptions(SPid),
|
[] = emqx:subscriptions(SPid),
|
||||||
emqx_mock_client:close_session(ConnPid).
|
emqx_mock_client:close_session(ConnPid).
|
||||||
|
|
||||||
|
t_message_expiry_interval_1(_) ->
|
||||||
|
ClientA = message_expiry_interval_init(),
|
||||||
|
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]].
|
||||||
|
|
||||||
|
t_message_expiry_interval_2(_) ->
|
||||||
|
ClientA = message_expiry_interval_init(),
|
||||||
|
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]].
|
||||||
|
|
||||||
|
message_expiry_interval_init() ->
|
||||||
|
{ok, ClientA} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
|
{ok, ClientB} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
|
{ok, _} = emqx_client:connect(ClientA),
|
||||||
|
{ok, _} = emqx_client:connect(ClientB),
|
||||||
|
%% subscribe and disconnect client-b
|
||||||
|
emqx_client:subscribe(ClientB, <<"t/a">>, 1),
|
||||||
|
emqx_client:stop(ClientB),
|
||||||
|
ClientA.
|
||||||
|
|
||||||
|
message_expiry_interval_exipred(ClientA, QoS) ->
|
||||||
|
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||||
|
%% publish to t/a and waiting for the message expired
|
||||||
|
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||||
|
ct:sleep(2000),
|
||||||
|
|
||||||
|
%% resume the session for client-b
|
||||||
|
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
|
{ok, _} = emqx_client:connect(ClientB1),
|
||||||
|
|
||||||
|
%% verify client-b could not receive the publish message
|
||||||
|
receive
|
||||||
|
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
|
||||||
|
ct:fail(should_have_expired)
|
||||||
|
after 300 ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
emqx_client:stop(ClientB1).
|
||||||
|
|
||||||
|
message_expiry_interval_not_exipred(ClientA, QoS) ->
|
||||||
|
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||||
|
%% publish to t/a
|
||||||
|
emqx_client:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||||
|
|
||||||
|
%% wait for 1s and then resume the session for client-b, the message should not expires
|
||||||
|
%% as Message-Expiry-Interval = 20s
|
||||||
|
ct:sleep(1000),
|
||||||
|
{ok, ClientB1} = emqx_client:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
|
{ok, _} = emqx_client:connect(ClientB1),
|
||||||
|
|
||||||
|
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
|
||||||
|
receive
|
||||||
|
{publish,#{client_pid := ClientB1, topic := <<"t/a">>,
|
||||||
|
properties := #{'Message-Expiry-Interval' := MsgExpItvl}}}
|
||||||
|
when MsgExpItvl < 20 -> ok;
|
||||||
|
{publish, _} = Msg ->
|
||||||
|
ct:fail({incorrect_publish, Msg})
|
||||||
|
after 300 ->
|
||||||
|
ct:fail(no_publish_received)
|
||||||
|
end,
|
||||||
|
emqx_client:stop(ClientB1).
|
||||||
|
|
|
@ -43,7 +43,7 @@ start_traces(_Config) ->
|
||||||
emqx_logger:set_log_level(debug),
|
emqx_logger:set_log_level(debug),
|
||||||
ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
|
ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
|
||||||
ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
|
ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
|
||||||
{error, invalid_log_level} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
|
{error, {invalid_log_level, bad_level}} = emqx_tracer:start_trace({client_id, <<"client3">>}, bad_level, "tmp/client3.log"),
|
||||||
ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
|
ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
|
||||||
ct:sleep(100),
|
ct:sleep(100),
|
||||||
|
|
||||||
|
@ -53,9 +53,9 @@ start_traces(_Config) ->
|
||||||
?assert(filelib:is_regular("tmp/topic_trace.log")),
|
?assert(filelib:is_regular("tmp/topic_trace.log")),
|
||||||
|
|
||||||
%% Get current traces
|
%% Get current traces
|
||||||
?assertEqual([{{client_id,<<"client">>},{debug,"tmp/client.log"}},
|
?assertEqual([{{client_id,"client"},{debug,"tmp/client.log"}},
|
||||||
{{client_id,<<"client2">>},{all,"tmp/client2.log"}},
|
{{client_id,"client2"},{debug,"tmp/client2.log"}},
|
||||||
{{topic,<<"a/#">>},{all,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
|
{{topic,"a/#"},{debug,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
|
||||||
|
|
||||||
%% set the overall log level to debug
|
%% set the overall log level to debug
|
||||||
emqx_logger:set_log_level(debug),
|
emqx_logger:set_log_level(debug),
|
||||||
|
|
Loading…
Reference in New Issue