Merge pull request #4116 from z8674558/merge430

merge 4.3.0 to 5.0
This commit is contained in:
Zaiming Shi 2021-01-29 14:46:47 +01:00 committed by GitHub
commit fc14e86829
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 370 additions and 322 deletions

View File

@ -127,7 +127,7 @@ running_test(){
IDLE_TIME=$((IDLE_TIME+1))
done
pytest -v /paho-mqtt-testing/interoperability/test_client/V5/test_connect.py::test_basic
emqx stop || kill $(ps -ef |grep emqx | grep beam.smp |awk '{print $2}')
emqx stop || kill $(ps -ef | grep -E '\-progname\s.+emqx\s' |awk '{print $2}')
if [ $(sed -n '/^ID=/p' /etc/os-release | sed -r 's/ID=(.*)/\1/g' | sed 's/"//g') = ubuntu ] \
|| [ $(sed -n '/^ID=/p' /etc/os-release | sed -r 's/ID=(.*)/\1/g' | sed 's/"//g') = debian ] \

View File

@ -17,7 +17,7 @@ services:
- |
sed -i "s 127.0.0.1 $$(ip route show |grep "link" |awk '{print $$1}') g" /opt/emqx/etc/acl.conf
sed -i '/emqx_telemetry/d' /opt/emqx/data/loaded_plugins
/usr/bin/start.sh
/opt/emqx/bin/emqx foreground
healthcheck:
test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]
interval: 5s
@ -44,7 +44,7 @@ services:
- |
sed -i "s 127.0.0.1 $$(ip route show |grep "link" |awk '{print $$1}') g" /opt/emqx/etc/acl.conf
sed -i '/emqx_telemetry/d' /opt/emqx/data/loaded_plugins
/usr/bin/start.sh
/opt/emqx/bin/emqx foreground
healthcheck:
test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]
interval: 5s

View File

@ -93,6 +93,19 @@ jobs:
unzip _packages/emqx/$pkg_name
gsed -i '/emqx_telemetry/d' ./emqx/data/loaded_plugins
./emqx/bin/emqx start || cat emqx/log/erlang.log.1
ready='no'
for i in {1..10}; do
if curl -fs 127.0.0.1:18083 > /dev/null; then
ready='yes'
break
fi
sleep 1
done
if [ "$ready" != "yes" ]; then
echo "Timed out waiting for emqx to be ready"
cat emqx/log/erlang.log.1
exit 1
fi
./emqx/bin/emqx_ctl status
./emqx/bin/emqx stop
rm -rf emqx

View File

@ -78,10 +78,10 @@ jobs:
if: matrix.connect_type == 'tls'
run: |
docker-compose -f .ci/compatibility_tests/docker-compose-mongo-tls.yaml up -d
sed -i 's|^[#[:space:]]*auth.mongo.ssl[[:space:]]*=.*|auth.mongo.ssl.enable = on|g' apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf
sed -i 's|^[#[:space:]]*auth.mongo.cacertfile[[:space:]]*=.*|auth.mongo.cacertfile = "/emqx/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE_data/ca.pem"|g' apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf
sed -i 's|^[#[:space:]]*auth.mongo.certfile[[:space:]]*=.*|auth.mongo.certfile = "/emqx/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE_data/client-cert.pem"|g' apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf
sed -i 's|^[#[:space:]]*auth.mongo.keyfile[[:space:]]*=.*|auth.mongo.keyfile = "/emqx/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE_data/client-key.pem"|g' apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf
sed -i 's|^[#[:space:]]*auth.mongo.ssl.enable[[:space:]]*=.*|auth.mongo.ssl.enable = on|g' apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf
sed -i 's|^[#[:space:]]*auth.mongo.cacertfile[[:space:]]*=.*|auth.mongo.cacertfile = \"/emqx/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE_data/ca.pem\"|g' apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf
sed -i 's|^[#[:space:]]*auth.mongo.certfile[[:space:]]*=.*|auth.mongo.certfile = \"/emqx/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE_data/client-cert.pem\"|g' apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf
sed -i 's|^[#[:space:]]*auth.mongo.keyfile[[:space:]]*=.*|auth.mongo.keyfile = \"/emqx/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE_data/client-key.pem\"|g' apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf
- name: setup
env:
MONGO_TAG: ${{ matrix.mongo_tag }}
@ -121,7 +121,7 @@ jobs:
- ipv4
- ipv6
connect_type:
- tls
# - tls
- tcp
steps:
@ -132,10 +132,10 @@ jobs:
if: matrix.connect_type == 'tls'
run: |
docker-compose -f .ci/compatibility_tests/docker-compose-mysql-tls.yaml up -d
sed -i 's|^[#[:space:]]*auth.mysql.ssl[[:space:]]*=.*|auth.mysql.ssl.enable = on|g' apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf
sed -i 's|^[#[:space:]]*auth.mysql.cacertfile[[:space:]]*=.*|auth.mysql.cacertfile = \"/emqx/apps/emqx_auth_mysql/test/emqx_auth_mysql_SUITE_data/ca.pem\"|g' apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf
sed -i 's|^[#[:space:]]*auth.mysql.certfile[[:space:]]*=.*|auth.mysql.certfile = \"/emqx/apps/emqx_auth_mysql/test/emqx_auth_mysql_SUITE_data/client-cert.pem\"|g' apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf
sed -i 's|^[#[:space:]]*auth.mysql.keyfile[[:space:]]*=.*|auth.mysql.keyfile = \"/emqx/apps/emqx_auth_mysql/test/emqx_auth_mysql_SUITE_data/client-key.pem\"|g' apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf
sed -i 's|^[#[:space:]]*auth.mysql.ssl.enable[[:space:]]*=.*|auth.mysql.ssl.enable = on|g' apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf
sed -i 's|^[#[:space:]]*auth.mysql.ssl.cacertfile[[:space:]]*=.*|auth.mysql.ssl.cacertfile = \"/emqx/apps/emqx_auth_mysql/test/emqx_auth_mysql_SUITE_data/ca.pem\"|g' apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf
sed -i 's|^[#[:space:]]*auth.mysql.ssl.certfile[[:space:]]*=.*|auth.mysql.ssl.certfile = \"/emqx/apps/emqx_auth_mysql/test/emqx_auth_mysql_SUITE_data/client-cert.pem\"|g' apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf
sed -i 's|^[#[:space:]]*auth.mysql.ssl.keyfile[[:space:]]*=.*|auth.mysql.ssl.keyfile = \"/emqx/apps/emqx_auth_mysql/test/emqx_auth_mysql_SUITE_data/client-key.pem\"|g' apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf
- name: setup
env:
MYSQL_TAG: ${{ matrix.mysql_tag }}

View File

@ -5,7 +5,8 @@
[![Coverage Status](https://coveralls.io/repos/github/emqx/emqx/badge.svg?branch=master)](https://coveralls.io/github/emqx/emqx?branch=master)
[![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx)](https://hub.docker.com/r/emqx/emqx)
[![Slack Invite](<https://slack-invite.emqx.io/badge.svg>)](https://slack-invite.emqx.io)
[![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt)
[![Twitter](https://img.shields.io/badge/Follow-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt)
[![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow?logo=github)](https://github.com/emqx/emqx/discussions)
[![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers)
@ -116,17 +117,11 @@ Visiting [EMQ X FAQ](https://docs.emqx.io/broker/latest/en/faq/faq.html) to get
The [EMQ X Roadmap uses Github milestones](https://github.com/emqx/emqx/milestones) to track the progress of the project.
## Community, discussion, contribution, and support
## Community
You can reach the EMQ community and developers via the following channels:
- [Slack](https://slack-invite.emqx.io/)
- [Twitter](https://twitter.com/emqtt)
- [Facebook](https://www.facebook.com/emqxmqtt)
- [Reddit](https://www.reddit.com/r/emqx/)
- [Forum](https://groups.google.com/d/forum/emqtt)
- [Blog](https://medium.com/@emqtt)
The EMQ X community can be found on [GitHub Discussions](https://github.com/emqx/emqx/discussions), where you can ask questions, voice ideas, and share your projects.
Please submit any bugs, issues, and feature requests to [emqx/emqx](https://github.com/emqx/emqx/issues).
To chat with other community members you can join the [EMQ X Slack](https://slack-invite.emqx.io).
## MQTT Specifications

View File

@ -1,5 +1,5 @@
{deps,
[{ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.1"}}}
[{ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.2"}}}
]}.
{edoc_opts, [{preprocess, true}]}.

View File

@ -60,6 +60,7 @@ load_acl_hook(DeviceDn) ->
if_enabled(Cfgs, Fun) ->
case get_env(Cfgs) of
{ok, []} -> ok;
{ok, InitArgs} -> Fun(InitArgs)
end.

View File

@ -33,7 +33,7 @@
]).
init(#{clientid_list := ClientidList, username_list := UsernameList}) ->
ok = ekka_mnesia:create_table(emqx_user, [
ok = ekka_mnesia:create_table(?TABLE, [
{disc_copies, [node()]},
{attributes, record_info(fields, emqx_user)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
@ -41,7 +41,7 @@ init(#{clientid_list := ClientidList, username_list := UsernameList}) ->
|| {Clientid, Password} <- ClientidList],
_ = [ add_default_user({{username, iolist_to_binary(Username)}, iolist_to_binary(Password)})
|| {Username, Password} <- UsernameList],
ok = ekka_mnesia:copy_table(emqx_user, disc_copies).
ok = ekka_mnesia:copy_table(?TABLE, disc_copies).
%% @private
add_default_user({Login, Password}) when is_tuple(Login) ->

View File

@ -128,7 +128,7 @@ add_clientid(_Bindings, Params) ->
Re = do_add_clientid(Params),
case Re of
ok -> return(ok);
{error, Error} -> {error, format_msg(Error)}
{error, Error} -> return({error, format_msg(Error)})
end
end.
@ -177,7 +177,7 @@ add_username(_Bindings, Params) ->
false ->
case do_add_username(Params) of
ok -> return(ok);
{error, Error} -> {error, format_msg(Error)}
{error, Error} -> return({error, format_msg(Error)})
end
end.

View File

@ -16,7 +16,7 @@ auth.mongo.type = single
##
## Value: String
##
## Examples: 127.0.0.1:27017,127.0.0.2:27017...
## Examples: "127.0.0.1:27017,127.0.0.2:27017,..."
auth.mongo.server = "127.0.0.1:27017"
## MongoDB pool size
@ -53,7 +53,7 @@ auth.mongo.database = mqtt
## Whether to enable SSL connection.
##
## Value: on | off
## auth.mongo.ssl = off
## auth.mongo.ssl.enable = off
## SSL keyfile.
##

View File

@ -23,12 +23,10 @@
%% FIXME: compatible with 4.0-4.2 version format, plan to delete in 5.0
{mapping, "auth.mongo.login", "emqx_auth_mongo.server", [
{default, ""},
{datatype, string}
]}.
{mapping, "auth.mongo.username", "emqx_auth_mongo.server", [
{default, ""},
{datatype, string}
]}.
@ -100,7 +98,7 @@
Pool = cuttlefish:conf_get("auth.mongo.pool", Conf),
%% FIXME: compatible with 4.0-4.2 version format, plan to delete in 5.0
Login = cuttlefish:conf_get("auth.mongo.username", Conf,
cuttlefish:conf_get("auth.mongo.login", Conf)
cuttlefish:conf_get("auth.mongo.login", Conf, "")
),
Passwd = cuttlefish:conf_get("auth.mongo.password", Conf),
DB = cuttlefish:conf_get("auth.mongo.database", Conf),
@ -140,7 +138,7 @@
Ssl = case cuttlefish:conf_get("auth.mongo.ssl.enable", Conf) of
on -> GenSsl;
off -> [];
true -> GenSsl;
true -> [{ssl, true}, {ssl_opts, SslOpts("auth.mongo.ssl_opts")}];
false -> []
end,

View File

@ -108,9 +108,9 @@ auth.mysql.acl_query = "select allow, ipaddr, username, clientid, access, topic
## Client ssl certificate.
##
## Value: File
## auth.mysql.ssl.certfile = path to your clientcert file
#auth.mysql.ssl.certfile = /path/to/your/clientcert.pem
## Client ssl keyfile.
##
## Value: File
## auth.mysql.ssl.keyfile = path to your clientkey file
#auth.mysql.ssl.keyfile = /path/to/your/clientkey.pem

View File

@ -36,23 +36,19 @@
]}.
{mapping, "auth.mysql.ssl.cafile", "emqx_auth_mysql.server", [
{default, ""},
{datatype, string}
]}.
{mapping, "auth.mysql.ssl.cacertfile", "emqx_auth_mysql.server", [
{default, ""},
{datatype, string}
]}.
%% FIXME: compatible with 4.0-4.2 version format, plan to delete in 5.0
{mapping, "auth.mysql.ssl.certfile", "emqx_auth_mysql.server", [
{default, ""},
{datatype, string}
]}.
{mapping, "auth.mysql.ssl.keyfile", "emqx_auth_mysql.server", [
{default, ""},
{datatype, string}
]}.
@ -87,19 +83,22 @@
{encoding, utf8},
{query_timeout, Timeout},
{keep_alive, true}],
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
Options1 =
case cuttlefish:conf_get("auth.mysql.ssl.enable", Conf) of
true ->
%% FIXME: compatible with 4.0-4.2 version format, plan to delete in 5.0
CA = cuttlefish:conf_get("auth.mysql.ssl.cacertfile", Conf,
cuttlefish:conf_get("auth.mysql.ssl.cafile", Conf)
CA = cuttlefish:conf_get(
"auth.mysql.ssl.cacertfile", Conf,
cuttlefish:conf_get("auth.mysql.ssl.cafile", Conf, undefined)
),
Cert = cuttlefish:conf_get("auth.mysql.ssl.certfile", Conf),
Key = cuttlefish:conf_get("auth.mysql.ssl.keyfile", Conf),
Options ++ [{ssl, {server_name_indication, disable},
Cert = cuttlefish:conf_get("auth.mysql.ssl.certfile", Conf, undefined),
Key = cuttlefish:conf_get("auth.mysql.ssl.keyfile", Conf, undefined),
Options ++ [{ssl, Filter([{server_name_indication, disable},
{cacertfile, CA},
{certfile, Cert},
{keyfile, Key}}];
{keyfile, Key}])
}];
_ ->
Options
end,

View File

@ -164,22 +164,32 @@ t_check_auth(_) ->
BcryptFoo = #{clientid => <<"bcrypt_foo">>, username => <<"bcrypt_foo">>, zone => external},
User1 = #{clientid => <<"bcrypt_foo">>, username => <<"user">>, zone => external},
Bcrypt = #{clientid => <<"bcrypt">>, username => <<"bcrypt">>, zone => external},
%
BcryptWrong = #{clientid => <<"bcrypt_wrong">>, username => <<"bcrypt_wrong">>, zone => external},
reload([{password_hash, plain}]),
{ok, #{is_superuser := true}} = emqx_access_control:authenticate(Plain#{password => <<"plain">>}),
{ok,#{is_superuser := true}} =
emqx_access_control:authenticate(Plain#{password => <<"plain">>}),
reload([{password_hash, md5}]),
{ok, #{is_superuser := false}} = emqx_access_control:authenticate(Md5#{password => <<"md5">>}),
{ok,#{is_superuser := false}} =
emqx_access_control:authenticate(Md5#{password => <<"md5">>}),
reload([{password_hash, sha}]),
{ok, #{is_superuser := false}} = emqx_access_control:authenticate(Sha#{password => <<"sha">>}),
{ok,#{is_superuser := false}} =
emqx_access_control:authenticate(Sha#{password => <<"sha">>}),
reload([{password_hash, sha256}]),
{ok, #{is_superuser := false}} = emqx_access_control:authenticate(Sha256#{password => <<"sha256">>}),
{ok,#{is_superuser := false}} =
emqx_access_control:authenticate(Sha256#{password => <<"sha256">>}),
reload([{password_hash, bcrypt}]),
{ok, #{is_superuser := false}} = emqx_access_control:authenticate(Bcrypt#{password => <<"password">>}),
reload([{password_hash, {pbkdf2, sha, 1, 16}}, {auth_query, "select password, salt from mqtt_user where username = '%u' limit 1"}]),
{ok, #{is_superuser := false}} = emqx_access_control:authenticate(Pbkdf2#{password => <<"password">>}),
{ok,#{is_superuser := false}} =
emqx_access_control:authenticate(Bcrypt#{password => <<"password">>}),
{error, not_authorized} =
emqx_access_control:authenticate(BcryptWrong#{password => <<"password">>}),
%%pbkdf2 sha
reload([{password_hash, {pbkdf2, sha, 1, 16}},
{auth_query, "select password, salt from mqtt_user where username = '%u' limit 1"}]),
{ok,#{is_superuser := false}} =
emqx_access_control:authenticate(Pbkdf2#{password => <<"password">>}),
reload([{password_hash, {salt, bcrypt}}]),
{ok, #{is_superuser := false}} = emqx_access_control:authenticate(BcryptFoo#{password => <<"foo">>}),
{ok,#{is_superuser := false}} =
emqx_access_control:authenticate(BcryptFoo#{password => <<"foo">>}),
{error, _} = emqx_access_control:authenticate(User1#{password => <<"foo">>}),
{error, not_authorized} = emqx_access_control:authenticate(Bcrypt#{password => <<"password">>}).

View File

@ -45,30 +45,22 @@ auth.pgsql.ssl.enable = off
## Example:
## tlsv1.1,tlsv1.2,tlsv1.3
##
## auth.pgsql.ssl_opts.tls_versions = tlsv1.2
## TLS version
## You can configure multi-version use "," split,
## default value is :tlsv1.2
## Example:
## tlsv1.1,tlsv1.2,tlsv1.3
##
## auth.pgsql.ssl.tls_versions = tlsv1.2
#auth.pgsql.ssl.tls_versions = tlsv1.2
## SSL keyfile.
##
## Value: File
## auth.pgsql.ssl.keyfile =
#auth.pgsql.ssl.keyfile =
## SSL certfile.
##
## Value: File
## auth.pgsql.ssl.certfile =
#auth.pgsql.ssl.certfile =
## SSL cacertfile.
##
## Value: File
## auth.pgsql.ssl.cacertfile =
#auth.pgsql.ssl.cacertfile =
## Authentication query.
##

View File

@ -107,7 +107,7 @@
Ssl = case cuttlefish:conf_get("auth.pgsql.ssl.enable", Conf) of
on -> GenSsl;
off -> [];
true -> GenSsl;
true -> [{ssl, true}, {ssl_opts, SslOpts("auth.pgsql.ssl_opts")}];
false -> []
end,

View File

@ -29,7 +29,10 @@
, equery/3
]).
-type client_info() :: #{username:=_, clientid:=_, peerhost:=_, _=>_}.
-type client_info() :: #{username := _,
clientid := _,
peerhost := _,
_ => _}.
%%--------------------------------------------------------------------
%% Avoid SQL Injection: Parse SQL to Parameter Query.
@ -56,6 +59,11 @@ pgvar(Sql, Params) ->
%% PostgreSQL Connect/Query
%%--------------------------------------------------------------------
%% Due to a bug in epgsql the caluse for `econnrefused` is not recognised by
%% dialyzer, result in this error:
%% The pattern {'error', Reason = 'econnrefused'} can never match the type ...
%% https://github.com/epgsql/epgsql/issues/246
-dialyzer([{nowarn_function, [connect/1]}]).
connect(Opts) ->
Host = proplists:get_value(host, Opts),
Username = proplists:get_value(username, Opts),
@ -64,6 +72,9 @@ connect(Opts) ->
{ok, C} ->
conn_post(C),
{ok, C};
{error, Reason = econnrefused} ->
?LOG(error, "[Postgres] Can't connect to Postgres server: Connection refused."),
{error, Reason};
{error, Reason = invalid_authorization_specification} ->
?LOG(error, "[Postgres] Can't connect to Postgres server: Invalid authorization specification."),
{error, Reason};

View File

@ -121,6 +121,7 @@ t_placeholders(_) ->
emqx_access_control:authenticate(ClientA#{password => <<"plain">>}),
{ok, _} =
emqx_access_control:authenticate(ClientA#{password => <<"plain">>, peerhost => {192,168,1,5}}).
t_check_auth(_) ->
Plain = #{clientid => <<"client1">>, username => <<"plain">>, zone => external},
Md5 = #{clientid => <<"md5">>, username => <<"md5">>, zone => external},

View File

@ -39,31 +39,41 @@
]}.
{mapping, "auth.redis.ssl.cacertfile", "emqx_auth_redis.options", [
{default, ""},
{datatype, string}
]}.
{mapping, "auth.redis.ssl.certfile", "emqx_auth_redis.options", [
{default, ""},
{datatype, string}
]}.
{mapping, "auth.redis.ssl.keyfile", "emqx_auth_redis.options", [
{default, ""},
{datatype, string}
]}.
{translation, "emqx_auth_redis.options", fun(Conf) ->
case cuttlefish:conf_get("auth.redis.ssl.enable", Conf, false) of
Ssl = cuttlefish:conf_get("auth.redis.ssl.enable", Conf, false),
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
case Ssl of
true ->
CA = cuttlefish:conf_get("auth.redis.ssl.cacertfile", Conf),
Cert = cuttlefish:conf_get("auth.redis.ssl.certfile", Conf),
Key = cuttlefish:conf_get("auth.redis.ssl.keyfile", Conf),
[{options, [{ssl_options, [{cacertfile, CA},
%% FIXME: compatible with 4.0-4.2 version format, plan to delete in 5.0
CA = cuttlefish:conf_get(
"auth.redis.ssl.cacertfile", Conf,
cuttlefish:conf_get("auth.redis.cacertfile", Conf, undefined)
),
Cert = cuttlefish:conf_get(
"auth.redis.ssl.certfile", Conf,
cuttlefish:conf_get("auth.redis.certfile", Conf, undefined)
),
Key = cuttlefish:conf_get(
"auth.redis.ssl.keyfile", Conf,
cuttlefish:conf_get("auth.redis.keyfile", Conf, undefined)
),
[{options, [{ssl_options,
Filter([{cacertfile, CA},
{certfile, Cert},
{keyfile, Key}]}]}];
false ->
[{options, []}]
{keyfile, Key}])
}]}];
_ -> [{options, []}]
end
end}.

View File

@ -152,7 +152,7 @@ bridge.mqtt.aws.batch_size = 32
## 0 means infinity (no limit on the inflight window)
##
## Value: Integer
bridge.mqtt.aws.max_inflight = 32
bridge.mqtt.aws.max_inflight_size = 32
## Base directory for replayq to store messages on disk
## If this config entry is missing or set to undefined,

View File

@ -103,7 +103,7 @@
{datatype, {duration, s}}
]}.
{mapping, "bridge.mqtt.$name.max_inflight", "emqx_bridge_mqtt.bridges", [
{mapping, "bridge.mqtt.$name.max_inflight_size", "emqx_bridge_mqtt.bridges", [
{default, 0},
{datatype, integer}
]}.
@ -222,6 +222,8 @@
mqttv5 -> v5;
_ -> v4
end};
Tr(max_inflight_size, Size, Cfg) ->
Cfg#{max_inflight => Size};
Tr(Key, Value, Cfg) ->
Cfg#{Key => Value}
end,

View File

@ -132,8 +132,8 @@
%% and work as message batch transport layer
%% reconnect_delay_ms: Delay in milli-seconds for the bridge worker to retry
%% in case of transportation failure.
%% max_inflight_batches: Max number of batches allowed to send-ahead before
%% receiving confirmation from remote node/cluster
%% max_inflight: Max number of batches allowed to send-ahead before receiving
%% confirmation from remote node/cluster
%% mountpoint: The topic mount point for messages sent to remote node/cluster
%% `undefined', `<<>>' or `""' to disable
%% forwards: Local topics to subscribe.

View File

@ -259,13 +259,13 @@ handle_received_create(TopicPrefix, MaxAge, Payload) ->
{error, bad_request}
end.
%% http_uri:decode/1 is deprecated in OTP-23
%% its equivalent uri_string:percent_decode however is not available before OTP 23
-if(?OTP_RELEASE >= 23).
percent_decode(Topic) -> uri_string:percent_decode(Topic).
-else.
percent_decode(Topic) -> http_uri:decode(Topic).
-endif.
%% @private Copy from http_uri.erl which has been deprecated since OTP-23
percent_decode(<<$%, Hex:2/binary, Rest/bits>>) ->
<<(binary_to_integer(Hex, 16)), (percent_decode(Rest))/binary>>;
percent_decode(<<First:1/binary, Rest/bits>>) ->
<<First/binary, (percent_decode(Rest))/binary>>;
percent_decode(<<>>) ->
<<>>.
%% When topic is timeout, server should return nocontent here,
%% but gen_coap only receive return value of #coap_content from coap_get, so temporarily we can't give the Code 2.07 {ok, nocontent} out.TBC!!!

View File

@ -192,8 +192,8 @@ do_register_hooks(Hook, ScriptName, _St) ->
?LOG(error, "Discard unknown hook type ~p from ~p", [Hook, ScriptName]).
do_unloadall(Scripts) ->
lists:foreach(fun do_unload/1, Scripts),
ok.
lists:foreach(fun do_unload/1, Scripts).
do_unload(Script) ->
emqx_lua_script:unregister_hooks(Script), ok.
emqx_lua_script:unregister_hooks(Script),
ok.

View File

@ -99,11 +99,11 @@ add_app(AppId, Name, Secret, Desc, Status, Expired) when is_binary(AppId) ->
AddFun = fun() ->
case mnesia:wread({mqtt_app, AppId}) of
[] -> mnesia:write(App);
_ -> mnesia:abort(alread_existed), ok
_ -> mnesia:abort(alread_existed)
end
end,
case mnesia:transaction(AddFun) of
{atomic, _} -> {ok, Secret1};
{atomic, ok} -> {ok, Secret1};
{aborted, Reason} -> {error, Reason}
end.

View File

@ -113,7 +113,7 @@ on_resource_create(_Name, Conf) ->
%%------------------------------------------------------------------------------
%% Action 'inspect'
%%------------------------------------------------------------------------------
-spec on_action_create_inspect(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
-spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
on_action_create_inspect(Id, Params) ->
Params.

View File

@ -43,7 +43,6 @@
]).
-export([ init_resource/4
, init_resource/5
, init_action/4
, clear_resource/3
, clear_rule/1
@ -239,7 +238,7 @@ create_resource(#{type := Type, config := Config0} = Params) ->
ok = emqx_rule_registry:add_resource(Resource),
%% Note that we will return OK in case of resource creation failure,
%% A timer is started to re-start the resource later.
catch cluster_call(init_resource, [M, F, ResId, Config, true]),
catch cluster_call(init_resource, [M, F, ResId, Config]),
{ok, Resource};
not_found ->
{error, {resource_type_not_found, Type}}
@ -382,24 +381,23 @@ delete_resource(ResId) ->
-spec(refresh_resources() -> ok).
refresh_resources() ->
lists:foreach(fun(#resource{id = ResId} = Res) ->
try refresh_resource(Res)
lists:foreach(fun refresh_resource/1,
emqx_rule_registry:get_resources()).
refresh_resource(Type) when is_atom(Type) ->
lists:foreach(fun refresh_resource/1,
emqx_rule_registry:get_resources_by_type(Type));
refresh_resource(#resource{id = ResId, config = Config, type = Type}) ->
{ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
try cluster_call(init_resource, [M, F, ResId, Config])
catch Error:Reason:ST ->
logger:critical(
"Can not re-stablish resource ~p: ~0p. The resource is disconnected."
"Fix the issue and establish it manually.\n"
"Stacktrace: ~0p",
[ResId, {Error, Reason}, ST])
end
end, emqx_rule_registry:get_resources()).
refresh_resource(Type) when is_atom(Type) ->
lists:foreach(fun(Resource) ->
refresh_resource(Resource)
end, emqx_rule_registry:get_resources_by_type(Type));
refresh_resource(#resource{id = ResId, config = Config, type = Type}) ->
{ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
cluster_call(init_resource, [M, F, ResId, Config]).
end.
-spec(refresh_rules() -> ok).
refresh_rules() ->
@ -531,14 +529,10 @@ cluster_call(Func, Args) ->
end.
init_resource(Module, OnCreate, ResId, Config) ->
init_resource(Module, OnCreate, ResId, Config, false).
init_resource(Module, OnCreate, ResId, Config, Restart) ->
Params = ?RAISE(
Module:OnCreate(ResId, Config),
Restart andalso
timer:apply_after(timer:seconds(60), ?MODULE, init_resource,
[Module, OnCreate, ResId, Config, Restart]),
begin
Module:OnCreate(ResId, Config)
end,
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
ResParams = #resource_params{id = ResId,
params = Params,

View File

@ -334,7 +334,7 @@ update_resource(#{id := Id}, NewParams) ->
end,
P2 = case proplists:get_value(<<"config">>, NewParams) of
undefined -> #{};
<<"{}">> -> #{};
[{}] -> #{};
Map -> #{<<"config">> => ?RAISE(maps:from_list(Map), {invalid_config, Map})}
end,
case emqx_rule_engine:update_resource(Id, maps:merge(P1, P2)) of
@ -342,13 +342,13 @@ update_resource(#{id := Id}, NewParams) ->
return(ok);
{error, not_found} ->
?LOG(error, "resource not found: ~0p", [Id]),
return({error, 400, list_to_binary("resource not found:" ++ binary_to_list(Id))});
return({error, 400, <<"resource not found:", Id/binary>>});
{error, {init_resource_failure, _}} ->
?LOG(error, "init resource failure: ~0p", [Id]),
return({error, 500, list_to_binary("init resource failure:" ++ binary_to_list(Id))});
return({error, 500, <<"init resource failure:", Id/binary>>});
{error, {dependency_exists, RuleId}} ->
?LOG(error, "dependency exists: ~0p", [RuleId]),
return({error, 500, list_to_binary("resource dependency by rule:" ++ binary_to_list(RuleId))});
return({error, 500, <<"resource dependency by rule:", RuleId/binary>>});
{error, Reason} ->
?LOG(error, "update resource failed: ~0p", [Reason]),
return({error, 500, <<"update resource failed,error info have been written to logfile!">>})

View File

@ -16,7 +16,6 @@
-module(emqx_sn_proper_types).
%-include("emqx_sn.hrl").
-include_lib("emqx_sn/include/emqx_sn.hrl").
-include_lib("proper/include/proper.hrl").

View File

@ -16,7 +16,6 @@
-module(prop_emqx_sn_frame).
%-include("emqx_sn.hrl").
-include_lib("emqx_sn/include/emqx_sn.hrl").
-include_lib("proper/include/proper.hrl").

View File

@ -41,7 +41,7 @@ web.hook.body.encoding_of_payload_field = plain
## Turn on peer certificate verification
##
## Value: true | false
## web.hook.ssl.verify = true
## web.hook.ssl.verify = false
## Connection process pool size
##

View File

@ -27,7 +27,7 @@
]}.
{mapping, "web.hook.ssl.verify", "emqx_web_hook.verify", [
{default, true},
{default, false},
{datatype, {enum, [true, false]}}
]}.

View File

@ -1,8 +1,7 @@
{plugins, [rebar3_proper]}.
{deps,
[{ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.1"}}},
{emqx_rule_engine, {git, "https://github.com/emqx/emqx-rule-engine"}}
[{ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.2"}}}
]}.
{edoc_opts, [{preprocess, true}]}.

View File

@ -40,7 +40,7 @@
default => 5,
title => #{en => <<"Connect Timeout">>,
zh => <<"连接超时时间"/utf8>>},
description => #{en => <<"Connect Timeout In Seconds">>,
description => #{en => <<"Connect timeout in seconds">>,
zh => <<"连接超时时间,单位秒"/utf8>>}},
request_timeout => #{
order => 3,
@ -48,7 +48,7 @@
default => 5,
title => #{en => <<"Request Timeout">>,
zh => <<"请求超时时间时间"/utf8>>},
description => #{en => <<"Request Timeout In Seconds">>,
description => #{en => <<"Request timeout in seconds">>,
zh => <<"请求超时时间,单位秒"/utf8>>}},
cacertfile => #{
order => 4,
@ -56,7 +56,7 @@
default => <<>>,
title => #{en => <<"CA Certificate File">>,
zh => <<"CA 证书文件"/utf8>>},
description => #{en => <<"CA Certificate File.">>,
description => #{en => <<"CA certificate file.">>,
zh => <<"CA 证书文件。"/utf8>>}
},
certfile => #{
@ -65,7 +65,7 @@
default => <<>>,
title => #{en => <<"Certificate File">>,
zh => <<"证书文件"/utf8>>},
description => #{en => <<"Certificate File.">>,
description => #{en => <<"Certificate file.">>,
zh => <<"证书文件。"/utf8>>}
},
keyfile => #{
@ -80,7 +80,7 @@
verify => #{
order => 7,
type => boolean,
default => true,
default => false,
title => #{en => <<"Verify">>,
zh => <<"Verify"/utf8>>},
description => #{en => <<"Turn on peer certificate verification.">>,
@ -92,8 +92,8 @@
default => 32,
title => #{en => <<"Pool Size">>,
zh => <<"连接池大小"/utf8>>},
description => #{en => <<"Pool Size for HTTP Server.">>,
zh => <<"HTTP Server 连接池大小。"/utf8>>}
description => #{en => <<"Pool size for HTTP server.">>,
zh => <<"HTTP server 连接池大小。"/utf8>>}
}
}).
@ -263,15 +263,15 @@ on_action_data_to_webserver(Selected, _Envs =
Req = create_req(Method, NPath, Headers, NBody),
case ehttpc:request(ehttpc_pool:pick_worker(Pool, ClientID), Method, Req, RequestTimeout) of
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
ok;
emqx_rule_metrics:inc_actions_success(Id);
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
ok;
emqx_rule_metrics:inc_actions_success(Id);
{ok, StatusCode, _} ->
?LOG(warning, "[WebHook Action] HTTP request failed with status code: ~p", [StatusCode]),
ok;
emqx_rule_metrics:inc_actions_error(Id);
{ok, StatusCode, _, _} ->
?LOG(warning, "[WebHook Action] HTTP request failed with status code: ~p", [StatusCode]),
ok;
emqx_rule_metrics:inc_actions_error(Id);
{error, Reason} ->
?LOG(error, "[WebHook Action] HTTP request error: ~p", [Reason]),
emqx_rule_metrics:inc_actions_error(Id)

View File

@ -52,10 +52,10 @@ translate_env() ->
{ok, URL} = application:get_env(?APP, url),
#{host := Host0,
path := Path0,
scheme := Scheme} = URIMap = uri_string:parse(add_default_scheme(URL)),
scheme := Scheme} = URIMap = uri_string:parse(add_default_scheme(uri_string:normalize(URL))),
Port = maps:get(port, URIMap, case Scheme of
"https" -> 443;
_ -> 80
"http" -> 80
end),
Path = path(Path0),
{Inet, Host} = parse_host(Host0),

View File

@ -68,7 +68,7 @@ set_special_configs_https(_) ->
Path = emqx_ct_helpers:deps_path(emqx_web_hook, "test/emqx_web_hook_SUITE_data/"),
SslOpts = [{keyfile, Path ++ "/client-key.pem"},
{certfile, Path ++ "/client-cert.pem"},
{cafile, Path ++ "/ca.pem"}],
{cacertfile, Path ++ "/ca.pem"}],
application:set_env(emqx_web_hook, ssl, true),
application:set_env(emqx_web_hook, ssloptions, SslOpts),
application:set_env(emqx_web_hook, url, "https://127.0.0.1:8888").

View File

@ -246,15 +246,11 @@ if [ -z "$RELX_CONFIG_PATH" ]; then
fi
fi
# Extract the target node name from node.args
if [ -z "$NAME_ARG" ]; then
if [ ! -z "$EMQX_NODE_NAME" ]; then
NODENAME="$EMQX_NODE_NAME"
elif [ ! -z `ps -ef | grep "$ERTS_PATH/beam.smp" | grep -o -E '\-name (\S*)' | awk '{print $2}'` ]; then
NODENAME=`ps -ef | grep "$ERTS_PATH/beam.smp" | grep -o -E '\-name (\S*)' | awk '{print $2}'`
else
NODENAME=`egrep '^[ \t]*node.name[ \t]*=[ \t]*' "$RUNNER_ETC_DIR/emqx.conf" 2> /dev/null | tail -1 | cut -d = -f 2-`
fi
NODENAME="${EMQX_NODE_NAME:-}"
# check if there is a node running, inspect its name
[ -z "$NODENAME" ] && NODENAME=`ps -ef | grep -E '\-progname\s.*emqx\s' | grep -o -E '\-name (\S*)' | awk '{print $2}'`
[ -z "$NODENAME" ] && NODENAME=`egrep '^[ \t]*node.name[ \t]*=[ \t]*' "$RUNNER_ETC_DIR/emqx.conf" 2> /dev/null | tail -1 | cut -d = -f 2-`
if [ -z "$NODENAME" ]; then
echoerr "vm.args needs to have a -name parameter."
echoerr " -sname is not supported."
@ -268,18 +264,17 @@ fi
# Extract the name type and name from the NAME_ARG for REMSH
NAME_TYPE="$(echo "$NAME_ARG" | awk '{print $1}')"
NAME="$(echo "$NAME_ARG" | awk '{print $2}')"
NODENAME="$(echo "$NAME" | awk -F'@' '{print $1}')"
export ESCRIPT_NAME="$NODENAME"
PIPE_DIR="${PIPE_DIR:-/$RUNNER_DATA_DIR/${WHOAMI}_erl_pipes/$NAME/}"
# Extract the target cookie
if [ -z "$COOKIE_ARG" ]; then
if [ ! -z "$EMQX_NODE_COOKIE" ]; then
COOKIE="$EMQX_NODE_COOKIE"
elif [ ! -z `ps -ef | grep "$ERTS_PATH/beam.smp" | grep -o -E '\-setcookie (\S*)' | awk '{print $2}'` ]; then
COOKIE=`ps -ef | grep "$ERTS_PATH/beam.smp" | grep -o -E '\-setcookie (\S*)' | awk '{print $2}'`
else
COOKIE=`egrep '^[ \t]*node.cookie[ \t]*=[ \t]*' "$RUNNER_ETC_DIR/emqx.conf" 2> /dev/null | tail -1 | cut -d = -f 2-`
fi
COOKIE="${EMQX_NODE_COOKIE:-}"
# check if there is a node running, steal its cookie
[ -z "$COOKIE" ] && COOKIE=`ps -ef | grep -E '\-progname\s.*emqx\s' | grep -o -E '\-setcookie (\S*)' | awk '{print $2}'`
[ -z "$COOKIE" ] && COOKIE=`egrep '^[ \t]*node.cookie[ \t]*=[ \t]*' "$RUNNER_ETC_DIR/emqx.conf" 2> /dev/null | tail -1 | cut -d = -f 2-`
if [ -z "$COOKIE" ]; then
echoerr "vm.args needs to have a -setcookie parameter."
echoerr "please check $RUNNER_ETC_DIR/emqx.conf"

View File

@ -31,19 +31,15 @@ relx_nodetool() {
}
# Extract the target node name from node.args
if [ -z "$NAME_ARG" ]; then
if [ ! -z "$EMQX_NODE_NAME" ]; then
NODENAME="$EMQX_NODE_NAME"
elif [ ! -z `ps -ef | grep "$ERTS_PATH/beam.smp" | grep -o -E '\-name (\S*)' | awk '{print $2}'` ]; then
NODENAME=`ps -ef | grep "$ERTS_PATH/beam.smp" | grep -o -E '\-name (\S*)' | awk '{print $2}'`
else
NODENAME=`egrep '^[ \t]*node.name[ \t]*=[ \t]*' $RUNNER_ETC_DIR/emqx.conf 2> /dev/null | tail -1 | cut -d = -f 2-`
fi
NODENAME="${EMQX_NODE_NAME:-}"
# check if there is a node running, inspect its name
[ -z "$NODENAME" ] && NODENAME=`ps -ef | grep -E '\progname\s.*emqx\s' | grep -o -E '\-name (\S*)' | awk '{print $2}'`
[ -z "$NODENAME" ] && NODENAME=`egrep '^[ \t]*node.name[ \t]*=[ \t]*' "$RUNNER_ETC_DIR/emqx.conf" 2> /dev/null | tail -1 | cut -d = -f 2-`
if [ -z "$NODENAME" ]; then
echoerr "vm.args needs to have a -name parameter."
echoerr " -sname is not supported."
echoerr "please check $RUNNER_ETC_DIR/emqx.conf"
echoerr "perhaps you do not have read permissions on $RUNNER_ETC_DIR/emqx.conf"
exit 1
else
NAME_ARG="-name ${NODENAME# *}"
@ -56,13 +52,10 @@ NAME="$(echo "$NAME_ARG" | awk '{print $2}')"
# Extract the target cookie
if [ -z "$COOKIE_ARG" ]; then
if [ ! -z "$EMQX_NODE_COOKIE" ]; then
COOKIE="$EMQX_NODE_COOKIE"
elif [ ! -z `ps -ef | grep "$ERTS_PATH/beam.smp" | grep -o -E '\-setcookie (\S*)' | awk '{print $2}'` ]; then
COOKIE=`ps -ef | grep "$ERTS_PATH/beam.smp" | grep -o -E '\-setcookie (\S*)' | awk '{print $2}'`
else
COOKIE=`egrep '^[ \t]*node.cookie[ \t]*=[ \t]*' $RUNNER_ETC_DIR/emqx.conf 2> /dev/null | tail -1 | cut -d = -f 2-`
fi
COOKIE="${EMQX_NODE_COOKIE:-}"
# check if there is a node running, steal its cookie
[ -z "$COOKIE" ] && COOKIE=`ps -ef | grep -E '\-progname\s.*emqx\s' | grep -o -E '\-setcookie (\S*)' | awk '{print $2}'`
[ -z "$COOKIE" ] && COOKIE=`egrep '^[ \t]*node.cookie[ \t]*=[ \t]*' "$RUNNER_ETC_DIR/emqx.conf" 2> /dev/null | tail -1 | cut -d = -f 2-`
if [ -z "$COOKIE" ]; then
echoerr "vm.args needs to have a -setcookie parameter."
echoerr "please check $RUNNER_ETC_DIR/emqx.conf"

View File

@ -13,8 +13,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: v4.1.1
version: 4.3.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: v4.1.1
appVersion: latest

View File

@ -1,5 +1,5 @@
ARG BUILD_FROM=emqx/build-env:erl23.2.2-alpine-amd64
ARG RUN_FROM=alpine:3.11
ARG RUN_FROM=alpine:3.12
FROM ${BUILD_FROM} AS builder
ARG QEMU_ARCH=x86_64
@ -35,15 +35,15 @@ LABEL org.label-schema.docker.dockerfile="Dockerfile" \
org.label-schema.name="emqx" \
org.label-schema.version=${PKG_VSN} \
org.label-schema.description="EMQ (Erlang MQTT Broker) is a distributed, massively scalable, highly extensible MQTT messaging broker written in Erlang/OTP." \
org.label-schema.url="http://emqx.io" \
org.label-schema.url="https://emqx.io" \
org.label-schema.vcs-type="Git" \
org.label-schema.vcs-url="https://github.com/emqx/emqx-docker" \
org.label-schema.vcs-url="https://github.com/emqx/emqx" \
maintainer="Raymond M Mouthaan <raymondmmouthaan@gmail.com>, Huang Rui <vowstar@gmail.com>, EMQ X Team <support@emqx.io>"
ARG QEMU_ARCH=x86_64
ARG EMQX_NAME=emqx
COPY deploy/docker/docker-entrypoint.sh deploy/docker/start.sh tmp/qemu-$QEMU_ARCH-stati* /usr/bin/
COPY deploy/docker/docker-entrypoint.sh tmp/qemu-$QEMU_ARCH-stati* /usr/bin/
COPY --from=builder /emqx/_build/$EMQX_NAME/rel/emqx /opt/emqx
RUN ln -s /opt/emqx/bin/* /usr/local/bin/
@ -59,7 +59,7 @@ RUN chgrp -Rf emqx /opt/emqx && chmod -Rf g+w /opt/emqx \
USER emqx
VOLUME ["/opt/emqx/log", "/opt/emqx/data", "/opt/emqx/lib", "/opt/emqx/etc"]
VOLUME ["/opt/emqx/log", "/opt/emqx/data", "/opt/emqx/etc"]
# emqx will occupy these port:
# - 1883 port for MQTT
@ -69,12 +69,12 @@ VOLUME ["/opt/emqx/log", "/opt/emqx/data", "/opt/emqx/lib", "/opt/emqx/etc"]
# - 8883 port for MQTT(SSL)
# - 11883 port for internal MQTT/TCP
# - 18083 for dashboard
# - 4369 for port mapping (epmd)
# - 4370 for port mapping
# - 4369 epmd (Erlang-distrbution port mapper daemon) listener (deprecated)
# - 4370 default Erlang distrbution port
# - 5369 for gen_rpc port mapping
# - 6369 for distributed node
EXPOSE 1883 8081 8083 8084 8883 11883 18083 4369 4370 5369 6369
# - 6369 6370 for distributed node
EXPOSE 1883 8081 8083 8084 8883 11883 18083 4369 4370 5369 6369 6370
ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]
CMD ["/usr/bin/start.sh"]
CMD ["/opt/emqx/bin/emqx", "foreground"]

View File

@ -2,7 +2,7 @@
+ **Where to get help**:
https://emqx.io, https://github.com/emqx/emqx-rel, or https://github.com/emqx/emqx
https://emqx.io or https://github.com/emqx/emqx
+ **Where to file issues:**
@ -71,17 +71,6 @@ These environment variables will ignore for configuration file.
| ---------------------------| ------------------ | ------------------------- | ------------------------------------- |
| EMQX_NAME | container name | none | emqx node short name |
| EMQX_HOST | container IP | none | emqx node host, IP or FQDN |
| EMQX_WAIT_TIME | 5 | none | wait time in sec before timeout |
| EMQX_NODE__NAME | EMQX_NAME@EMQX_HOST| node.name | Erlang node name, name@ipaddress/host |
| EMQX_NODE__COOKIE | emqx_dist_cookie | node.cookie | cookie for cluster |
| EMQX_LOG__CONSOLE | console | log.console | log console output method |
| EMQX_ALLOW_ANONYMOUS | true | allow_anonymous | allow mqtt anonymous login |
| EMQX_LISTENER__TCP__EXTERNAL| 1883 | listener.tcp.external | MQTT TCP port |
| EMQX_LISTENER__SSL__EXTERNAL| 8883 | listener.ssl.external | MQTT TCP TLS/SSL port |
| EMQX_LISTENER__WS__EXTERNAL | 8083 | listener.ws.external | HTTP and WebSocket port |
| EMQX_LISTENER__WSS__EXTERNAL| 8084 | listener.wss.external | HTTPS and WSS port |
| EMQX_LISTENER__API__MGMT | 8080 | listener.api.mgmt | MGMT API port |
| EMQX_MQTT__MAX_PACKET_SIZE | 64KB | mqtt.max_packet_size | Max Packet Size Allowed |
The list is incomplete and may changed with [etc/emqx.conf](https://github.com/emqx/emqx/blob/master/etc/emqx.conf) and plugin configuration files. But the mapping rule is similar.
@ -129,6 +118,7 @@ Default environment variable ``EMQX_LOADED_PLUGINS``, including
+ ``emqx_recon``
+ ``emqx_retainer``
+ ``emqx_rule_engine``
+ ``emqx_management``
+ ``emqx_dashboard``
@ -222,7 +212,7 @@ Let's create a static node list cluster from docker-compose.
services:
emqx1:
image: emqx/emqx:v4.0.0
image: emqx/emqx:latest
environment:
- "EMQX_NAME=emqx"
- "EMQX_HOST=node1.emqx.io"
@ -234,7 +224,7 @@ Let's create a static node list cluster from docker-compose.
- node1.emqx.io
emqx2:
image: emqx/emqx:v4.0.0
image: emqx/emqx:latest
environment:
- "EMQX_NAME=emqx"
- "EMQX_HOST=node2.emqx.io"
@ -301,7 +291,7 @@ services:
### Kernel Tuning
Under linux host machine, the easiest way is [tuning host machine's kernel](https://docs.emqx.io/broker/latest/en/tutorial/turn.html#turning-guide).
Under linux host machine, the easiest way is [Tuning guide](https://docs.emqx.io/en/broker/latest/tutorial/tune.html#linux-kernel-tuning).
If you want tune linux kernel by docker, you must ensure your docker is latest version (>=1.12).

View File

@ -40,10 +40,6 @@ if [[ -z "$EMQX_HOST" ]]; then
export EMQX_HOST
fi
if [[ -z "$EMQX_WAIT_TIME" ]]; then
export EMQX_WAIT_TIME=5
fi
if [[ -z "$EMQX_NODE_NAME" ]]; then
export EMQX_NODE_NAME="$EMQX_NAME@$EMQX_HOST"
fi

View File

@ -1,62 +0,0 @@
#!/bin/sh
set -e -u
EMQX_WAIT_TIME=${EMQX_WAIT_TIME:-5}
emqx_exit(){
# At least erlang.log.1 exists
if [ -f /opt/emqx/log/erlang.log.1 ]; then
# tail emqx.log.*
erlang_log=$(echo $(ls -t /opt/emqx/log/erlang.log.*) | awk '{print $1}')
num=$(sed -n -e '/LOGGING STARTED/=' ${erlang_log} | tail -1)
[ ! -z $num ] && [ $num -gt 2 ] && tail -n +$(expr $num - 2) ${erlang_log}
fi
echo "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqx exit abnormally"
exit 1
}
## EMQ Main script
# When receiving the EXIT signal, execute emqx_exit function
trap "emqx_exit" EXIT
# Start and run emqx, and when emqx crashed, this container will stop
/opt/emqx/bin/emqx start
# Sleep 5 seconds to wait for the loaded plugins catch up.
sleep 5
echo "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqx start"
## Fork tailing erlang.log, the fork is not killed after this script exits
## The assumption is that this is the docker entrypoint,
## hence docker container is terminated after entrypoint exists
tail -f /opt/emqx/log/erlang.log.1 &
# monitor emqx is running, or the docker must stop to let docker PaaS know
# warning: never use infinite loops such as `` while true; do sleep 1000; done`` here
# you must let user know emqx crashed and stop this container,
# and docker dispatching system can known and restart this container.
IDLE_TIME=0
MGMT_CONF='/opt/emqx/etc/plugins/emqx_management.conf'
MGMT_PORT=$(sed -n -r '/^management.listener.http.port[ \t]=[ \t].*$/p' $MGMT_CONF | sed -r 's/^management.listener.http.port = (.*)$/\1/g')
while [ $IDLE_TIME -lt 5 ]; do
IDLE_TIME=$(expr $IDLE_TIME + 1)
if curl http://localhost:${MGMT_PORT}/status >/dev/null 2>&1; then
IDLE_TIME=0
# Print the latest erlang.log
now_erlang_log=$(ps -ef |grep "tail -f /opt/emqx/log/erlang.log" |grep -v grep | sed -r "s/.*tail -f (.*)/\1/g")
new_erlang_log="$(ls -t /opt/emqx/log/erlang.log.* | head -1)"
if [ $now_erlang_log != $new_erlang_log ];then
tail -f $new_erlang_log &
kill $(ps -ef |grep "tail -f $now_erlang_log" | grep -v grep | awk '{print $1}')
fi
else
echo "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqx not running, waiting for recovery in $((25-IDLE_TIME*5)) seconds"
fi
sleep $EMQX_WAIT_TIME
done
# If running to here (the result 5 times not is running, thus in 25s emqx is not running), exit docker image
# Then the high level PaaS, e.g. docker swarm mode, will know and alert, rebanlance this service

View File

@ -38,7 +38,7 @@ status -p $pidfile -l $(basename $lockfile) $NAME >/dev/null 2>&1
running=$?
find_pid() {
ps ax | grep beam.smp | grep -E "\-progname.+$NAME" | awk '{print $1}'
ps ax | grep -E "\-progname\s+$NAME\s" | awk '{print $1}'
}
check_pid_status() {
@ -92,7 +92,7 @@ stop() {
hardstop() {
echo -n $"Shutting down $NAME: "
su - emqx -c "ps -ef | grep beam.smp | grep '\-progname $NAME ' | grep -v grep | awk '{print \$2}' | xargs kill -9"
su - emqx -c "ps -ef | grep -E '\-progname\s+$NAME\s' | awk '{print \$2}' | xargs kill -9"
for n in $(seq 1 10); do
sleep 1
check_pid_status

View File

@ -55,7 +55,7 @@ docker-build:
@docker build --no-cache \
--build-arg PKG_VSN=$(PKG_VSN) \
--build-arg BUILD_FROM=emqx/build-env:erl23.2.2-alpine-$(ARCH) \
--build-arg RUN_FROM=$(ARCH)/alpine:3.11 \
--build-arg RUN_FROM=$(ARCH)/alpine:3.12 \
--build-arg EMQX_NAME=$(EMQX_NAME) \
--build-arg QEMU_ARCH=$(QEMU_ARCH) \
--tag $(TARGET):build-$(OS)-$(ARCH) \

View File

@ -1554,10 +1554,16 @@ listener.ws.external.zone = external
## Value: ACL Rule
listener.ws.external.access.1 = "allow all"
## Verify if the protocol header is valid. Turn off for WeChat MiniApp.
## If set to true, the server fails if the client does not have a Sec-WebSocket-Protocol to send.
## Set to false for WeChat MiniApp.
##
## Value: on | off
listener.ws.external.verify_protocol_header = on
## Value: true | false
## listener.ws.external.fail_if_no_subprotocol = on
## Supported subprotocols
##
## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
## listener.ws.external.supported_protocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
## HAProxy or Nginx.
@ -1769,10 +1775,16 @@ listener.wss.external.zone = external
## Value: ACL Rule
listener.wss.external.access.1 = "allow all"
## See: listener.ws.external.verify_protocol_header
## If set to true, the server fails if the client does not have a Sec-WebSocket-Protocol to send.
## Set to false for WeChat MiniApp.
##
## Value: on | off
listener.wss.external.verify_protocol_header = on
## Value: true | false
## listener.wss.external.fail_if_no_subprotocol = true
## Supported subprotocols
##
## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
## listener.ws.external.supported_protocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
## Enable the Proxy Protocol V1/2 support.
##

View File

@ -1472,9 +1472,14 @@ end}.
{datatype, string}
]}.
{mapping, "listener.ws.$name.verify_protocol_header", "emqx.listeners", [
{default, on},
{datatype, flag}
{mapping, "listener.ws.$name.fail_if_no_subprotocol", "emqx.listeners", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "listener.ws.$name.supported_subprotocols", "emqx.listeners", [
{default, "mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5"},
{datatype, string}
]}.
{mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [
@ -1638,9 +1643,14 @@ end}.
{datatype, string}
]}.
{mapping, "listener.wss.$name.verify_protocol_header", "emqx.listeners", [
{default, on},
{datatype, flag}
{mapping, "listener.wss.$name.fail_if_no_subprotocol", "emqx.listeners", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "listener.wss.$name.supported_subprotocols", "emqx.listeners", [
{default, "mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5"},
{datatype, string}
]}.
{mapping, "listener.wss.$name.access.$id", "emqx.listeners", [
@ -1891,7 +1901,8 @@ end}.
{rate_limit, RateLimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))},
{proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)},
{proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
{verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},
{fail_if_no_subprotocol, cuttlefish:conf_get(Prefix ++ ".fail_if_no_subprotocol", Conf, undefined)},
{supported_subprotocols, string:tokens(cuttlefish:conf_get(Prefix ++ ".supported_subprotocols", Conf, ""), ", ")},
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
{compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)},
{idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)},

View File

@ -50,7 +50,7 @@
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {branch, "hocon"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.3"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.0"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.2.0"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.1"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}}

View File

@ -0,0 +1,66 @@
#!/bin/bash
set -euo pipefail
## This script takes the first argument as docker iamge name,
## starts two containers running with the built code mount
## into docker containers.
##
## NOTE: containers are not instructed to rebuild emqx,
## Please use a docker image which is compatible with
## the docker host.
##
## EMQX can only start with longname (https://erlang.org/doc/reference_manual/distributed.html)
## The host name part of EMQX's node name has to be static, this means we should either
## pre-assign static IP for containers, or ensure containers can communiate with each other by name
## this is why a docker network is created, and the containers's names have a dot.
# ensure dir
cd -P -- "$(dirname -- "$0")/.."
IMAGE="${1}"
PROJ_DIR="$(pwd)"
NET='emqx.io'
NODE1="node1.$NET"
NODE2="node2.$NET"
COOKIE='this-is-a-secret'
## clean up
docker rm -f "$NODE1" >/dev/null 2>&1 || true
docker rm -f "$NODE2" >/dev/null 2>&1 || true
docker network rm "$NET" >/dev/null 2>&1 || true
docker network create "$NET"
docker run -d -t --restart=always --name "$NODE1" \
--net "$NET" \
-e EMQX_NODE_NAME="emqx@$NODE1" \
-e EMQX_NODE_COOKIE="$COOKIE" \
-e WAIT_FOR_ERLANG=60 \
-p 18083:18083 \
-v $PROJ_DIR/_build/emqx/rel/emqx:/built \
$IMAGE sh -c 'cp -r /built /emqx && /emqx/bin/emqx console'
docker run -d -t --restart=always --name "$NODE2" \
--net "$NET" \
-e EMQX_NODE_NAME="emqx@$NODE2" \
-e EMQX_NODE_COOKIE="$COOKIE" \
-e WAIT_FOR_ERLANG=60 \
-p 18084:18083 \
-v $PROJ_DIR/_build/emqx/rel/emqx:/built \
$IMAGE sh -c 'cp -r /built /emqx && /emqx/bin/emqx console'
wait (){
container="$1"
while ! docker exec "$container" /emqx/bin/emqx_ctl status >/dev/null 2>&1; do
echo -n '.'
sleep 1
done
}
wait $NODE1
wait $NODE2
echo
docker exec $NODE1 /emqx/bin/emqx_ctl cluster join "emqx@$NODE2"

View File

@ -192,16 +192,38 @@ init(Req, Opts) ->
end.
parse_sec_websocket_protocol(Req, Opts, WsOpts) ->
FailIfNoSubprotocol = proplists:get_value(fail_if_no_subprotocol, Opts),
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
undefined ->
%% TODO: why not reply 500???
{cowboy_websocket, Req, [Req, Opts], WsOpts};
[<<"mqtt", Vsn/binary>>] ->
Resp = cowboy_req:set_resp_header(
<<"sec-websocket-protocol">>, <<"mqtt", Vsn/binary>>, Req),
case FailIfNoSubprotocol of
true ->
{ok, cowboy_req:reply(400, Req), WsOpts};
false ->
{cowboy_websocket, Req, [Req, Opts], WsOpts}
end;
Subprotocols ->
SupportedSubprotocols = proplists:get_value(supported_subprotocols, Opts),
NSupportedSubprotocols = [list_to_binary(Subprotocol)
|| Subprotocol <- SupportedSubprotocols],
case pick_subprotocol(Subprotocols, NSupportedSubprotocols) of
{ok, Subprotocol} ->
Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>,
Subprotocol,
Req),
{cowboy_websocket, Resp, [Req, Opts], WsOpts};
_ ->
{error, no_supported_subprotocol} ->
{ok, cowboy_req:reply(400, Req), WsOpts}
end
end.
pick_subprotocol([], _SupportedSubprotocols) ->
{error, no_supported_subprotocol};
pick_subprotocol([Subprotocol | Rest], SupportedSubprotocols) ->
case lists:member(Subprotocol, SupportedSubprotocols) of
true ->
{ok, Subprotocol};
false ->
pick_subprotocol(Rest, SupportedSubprotocols)
end.
parse_header_fun_origin(Req, Opts) ->

View File

@ -146,7 +146,9 @@ t_call(_) ->
?assertEqual(Info, ?ws_conn:call(WsPid, info)).
t_init(_) ->
Opts = [{idle_timeout, 300000}],
Opts = [{idle_timeout, 300000},
{fail_if_no_subprotocol, false},
{supported_subprotocols, ["mqtt"]}],
WsOpts = #{compress => false,
deflate_opts => #{},
max_frame_size => infinity,