Merge branch 'dev/v4.3.0' into refact/avoid_anonymous_funcs

This commit is contained in:
JianBo He 2020-12-14 11:23:13 +08:00 committed by GitHub
commit 1b016b16f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
248 changed files with 1555 additions and 3304 deletions

2
.gitignore vendored
View File

@ -36,3 +36,5 @@ _checkouts
rebar.config.rendered rebar.config.rendered
/rebar3 /rebar3
rebar.lock rebar.lock
tmp/
_packages

1
.tool-versions Normal file
View File

@ -0,0 +1 @@
erlang 22.3.4.13

View File

@ -1,8 +1,11 @@
REBAR_VERSION = 3.14.3-emqx-1 REBAR_VERSION = 3.14.3-emqx-2
REBAR = ./rebar3 REBAR = ./rebar3
export PKG_VSN ?= $(shell git describe --tags --always)
# comma separated versions
export RELUP_BASE_VERSIONS ?=
PROFILE ?= emqx PROFILE ?= emqx
PROFILES := emqx emqx-edge PROFILES := emqx emqx-edge check test
PKG_PROFILES := emqx-pkg emqx-edge-pkg PKG_PROFILES := emqx-pkg emqx-edge-pkg
export REBAR_GIT_CLONE_OPTIONS += --depth=1 export REBAR_GIT_CLONE_OPTIONS += --depth=1
@ -19,17 +22,13 @@ ensure-rebar3:
$(REBAR): ensure-rebar3 $(REBAR): ensure-rebar3
.PHONY: xref .PHONY: eunit
xref: eunit: $(REBAR)
$(REBAR) xref $(REBAR) eunit
.PHONY: dialyzer .PHONY: ct
dialyzer: ct: $(REBAR)
$(REBAR) dialyzer $(REBAR) ct
.PHONY: distclean
distclean:
@rm -rf _build
.PHONY: $(PROFILES) .PHONY: $(PROFILES)
$(PROFILES:%=%): $(REBAR) $(PROFILES:%=%): $(REBAR)
@ -46,14 +45,10 @@ $(PROFILES:%=build-%): $(REBAR)
# rebar clean # rebar clean
.PHONY: clean $(PROFILES:%=clean-%) .PHONY: clean $(PROFILES:%=clean-%)
clean: $(PROFILES:%=clean-%) clean-stamps clean: $(PROFILES:%=clean-%)
$(PROFILES:%=clean-%): $(REBAR) $(PROFILES:%=clean-%): $(REBAR)
$(REBAR) as $(@:clean-%=%) clean $(REBAR) as $(@:clean-%=%) clean
.PHONY: clean-stamps
clean-stamps:
find -L _build -name '.stamp' -type f | xargs rm -f
.PHONY: deps-all .PHONY: deps-all
deps-all: $(REBAR) $(PROFILES:%=deps-%) $(PKG_PROFILES:%=deps-%) deps-all: $(REBAR) $(PROFILES:%=deps-%) $(PKG_PROFILES:%=deps-%)
@ -66,5 +61,13 @@ else
endif endif
$(REBAR) as $(@:deps-%=%) get-deps $(REBAR) as $(@:deps-%=%) get-deps
.PHONY: xref
xref: $(REBAR)
$(REBAR) as check xref
.PHONY: dialyzer
dialyzer: $(REBAR)
$(REBAR) as check dialyzer
include packages.mk include packages.mk
include docker.mk include docker.mk

View File

@ -40,18 +40,30 @@ Get the binary package of the corresponding OS from [EMQ X Download](https://www
The *EMQ X* broker requires Erlang/OTP R21+ to build since 3.0 release. The *EMQ X* broker requires Erlang/OTP R21+ to build since 3.0 release.
For 4.3 and later versions.
```bash
git clone https://github.com/emqx/emqx.git
cd emqx
make
_build/emqx/rel/emqx/bin console
``` ```
git clone -b v4.0.0 https://github.com/emqx/emqx-rel.git
cd emqx-rel && make For earlier versions, release has to be built from another repo.
cd _build/emqx/rel/emqx && ./bin/emqx console
```bash
git clone https://github.com/emqx/emqx-rel.git
cd emqx-rel
make
_build/emqx/rel/emqx/bin/emqx console
``` ```
## Quick Start ## Quick Start
``` If emqx is built from source, `cd _buid/emqx/rel/emqx`.
Or change to the installation root directory if emqx is installed from a release package.
```bash
# Start emqx # Start emqx
./bin/emqx start ./bin/emqx start
@ -64,6 +76,24 @@ cd _build/emqx/rel/emqx && ./bin/emqx console
To view the dashboard after running, use your browser to open: http://localhost:18083 To view the dashboard after running, use your browser to open: http://localhost:18083
## Test
### To test everything in one go
```
make eunit ct
```
### To run subset of the common tests
examples
```bash
./rebar3 ct --dir test,apps/emqx_sn,apps/emqx_coap
./rebar3 ct --suite test/emqx_SUITE.erl,apps/emqx_auth_http/test/emqx_auth_http_SUITE.erl
./rebar3 ct --suite test/emqx_SUITE.erl --testcase t_restart
```
## FAQ ## FAQ
Visiting [EMQ X FAQ](https://docs.emqx.io/broker/latest/en/faq/faq.html) to get help of common problems. Visiting [EMQ X FAQ](https://docs.emqx.io/broker/latest/en/faq/faq.html) to get help of common problems.

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_http_sup]}, {registered, [emqx_auth_http_sup]},
{applications, [kernel,stdlib,gproc,gun,emqx]}, {applications, [kernel,stdlib,gproc,gun]},
{mod, {emqx_auth_http_app, []}}, {mod, {emqx_auth_http_app, []}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -29,6 +29,10 @@
, feedvar/2 , feedvar/2
]). ]).
-type http_request() :: #http_request{method::'get' | 'post',params::[any()]}.
%-type http_opts() :: #{clientid:=_, peerhost:=_, protocol:=_, _=>_}.
%-type retry_opts() :: #{backoff:=_, interval:=_, times:=_, _=>_}.
%% Callbacks %% Callbacks
-export([ register_metrics/0 -export([ register_metrics/0
, check/3 , check/3
@ -80,7 +84,7 @@ authenticate(PoolName, #http_request{path = Path,
request_timeout = RequestTimeout}, ClientInfo) -> request_timeout = RequestTimeout}, ClientInfo) ->
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), RequestTimeout). request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), RequestTimeout).
-spec(is_superuser(atom(), maybe(#http_request{}), emqx_types:client()) -> boolean()). -spec(is_superuser(atom(), maybe(http_request()), emqx_types:client()) -> boolean()).
is_superuser(_PoolName, undefined, _ClientInfo) -> is_superuser(_PoolName, undefined, _ClientInfo) ->
false; false;
is_superuser(PoolName, #http_request{path = Path, is_superuser(PoolName, #http_request{path = Path,

View File

@ -36,8 +36,8 @@ start(_StartType, _StartArgs) ->
ok -> ok ->
{ok, PoolOpts} = application:get_env(?APP, pool_opts), {ok, PoolOpts} = application:get_env(?APP, pool_opts),
{ok, Sup} = emqx_http_client_sup:start_link(?APP, ssl(inet(PoolOpts))), {ok, Sup} = emqx_http_client_sup:start_link(?APP, ssl(inet(PoolOpts))),
with_env(auth_req, fun load_auth_hook/1), _ = with_env(auth_req, fun load_auth_hook/1),
with_env(acl_req, fun load_acl_hook/1), _ = with_env(acl_req, fun load_acl_hook/1),
{ok, Sup}; {ok, Sup};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
@ -119,7 +119,7 @@ translate_env() ->
#{host := Host0, #{host := Host0,
port := Port, port := Port,
path := Path} = uri_string:parse(list_to_binary(URL)), path := Path} = uri_string:parse(list_to_binary(URL)),
{ok, Host} = inet:parse_address(binary_to_list(Host0)), Host = get_addr(binary_to_list(Host0)),
[{Name, {Host, Port, binary_to_list(Path)}} | Acc] [{Name, {Host, Port, binary_to_list(Path)}} | Acc]
end end
end, [], [acl_req, auth_req, super_req]), end, [], [acl_req, auth_req, super_req]),
@ -145,3 +145,16 @@ same_host_and_port([{_, {Host, Port, _}}, URL = {_, {Host, Port, _}} | Rest]) ->
same_host_and_port([URL | Rest]); same_host_and_port([URL | Rest]);
same_host_and_port(_) -> same_host_and_port(_) ->
false. false.
get_addr(Hostname) ->
case inet:parse_address(Hostname) of
{ok, {_,_,_,_} = Addr} -> Addr;
{ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr;
{error, einval} ->
case inet:getaddr(Hostname, inet) of
{error, _} ->
{ok, Addr} = inet:getaddr(Hostname, inet6),
Addr;
{ok, Addr} -> Addr
end
end.

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_jwt_sup]}, {registered, [emqx_auth_jwt_sup]},
{applications, [kernel,stdlib,jose,emqx]}, {applications, [kernel,stdlib,jose]},
{mod, {emqx_auth_jwt_app, []}}, {mod, {emqx_auth_jwt_app, []}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -1,10 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -33,11 +33,10 @@ start(_Type, _Args) ->
{ok, Pid} = start_auth_server(jwks_svr_options()), {ok, Pid} = start_auth_server(jwks_svr_options()),
ok = emqx_auth_jwt:register_metrics(), ok = emqx_auth_jwt:register_metrics(),
AuthEnv0 = auth_env(), AuthEnv0 = auth_env(),
AuthEnv1 = AuthEnv0#{pid => Pid}, AuthEnv1 = AuthEnv0#{pid => Pid},
emqx:hook('client.authenticate', {emqx_auth_jwt, check, [AuthEnv1]}), _ = emqx:hook('client.authenticate', {emqx_auth_jwt, check, [AuthEnv1]}),
{ok, Sup, AuthEnv1}. {ok, Sup, AuthEnv1}.
stop(AuthEnv) -> stop(AuthEnv) ->

View File

@ -176,7 +176,7 @@ reset_timer(State = #state{intv = Intv}) ->
cancel_timer(State = #state{tref = undefined}) -> cancel_timer(State = #state{tref = undefined}) ->
State; State;
cancel_timer(State = #state{tref = TRef}) -> cancel_timer(State = #state{tref = TRef}) ->
erlang:cancel_timer(TRef), _ = erlang:cancel_timer(TRef),
State#state{tref = undefined}. State#state{tref = undefined}.
do_verify(_JwsCompated, []) -> do_verify(_JwsCompated, []) ->

View File

@ -0,0 +1,26 @@
version: '3'
services:
erlang:
image: erlang:22.1
volumes:
- ../:/emqx_auth_ldap
networks:
- emqx_bridge
depends_on:
- ldap_server
tty: true
ldap_server:
build: ./emqx-ldap
image: emqx-ldap:1.0
restart: always
ports:
- 389:389
- 636:636
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,26 @@
FROM buildpack-deps:stretch
ENV VERSION=2.4.50
RUN apt-get update && apt-get install -y groff groff-base
RUN wget ftp://ftp.openldap.org/pub/OpenLDAP/openldap-release/openldap-${VERSION}.tgz \
&& gunzip -c openldap-${VERSION}.tgz | tar xvfB - \
&& cd openldap-${VERSION} \
&& ./configure && make depend && make && make install \
&& cd .. && rm -rf openldap-${VERSION}
COPY ./slapd.conf /usr/local/etc/openldap/slapd.conf
COPY ./emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
COPY ./emqx.schema /usr/local/etc/openldap/schema/emqx.schema
COPY ./*.pem /usr/local/etc/openldap/
RUN mkdir -p /usr/local/etc/openldap/data \
&& slapadd -l /usr/local/etc/openldap/schema/emqx.io.ldif -f /usr/local/etc/openldap/slapd.conf
WORKDIR /usr/local/etc/openldap
EXPOSE 389 636
ENTRYPOINT ["/usr/local/libexec/slapd", "-h", "ldap:/// ldaps:///", "-d", "3", "-f", "/usr/local/etc/openldap/slapd.conf"]
CMD []

View File

@ -0,0 +1,16 @@
include /usr/local/etc/openldap/schema/core.schema
include /usr/local/etc/openldap/schema/cosine.schema
include /usr/local/etc/openldap/schema/inetorgperson.schema
include /usr/local/etc/openldap/schema/ppolicy.schema
include /usr/local/etc/openldap/schema/emqx.schema
TLSCACertificateFile /usr/local/etc/openldap/cacert.pem
TLSCertificateFile /usr/local/etc/openldap/cert.pem
TLSCertificateKeyFile /usr/local/etc/openldap/key.pem
database bdb
suffix "dc=emqx,dc=io"
rootdn "cn=root,dc=emqx,dc=io"
rootpw {SSHA}eoF7NhNrejVYYyGHqnt+MdKNBh4r1w3W
directory /usr/local/etc/openldap/data

View File

@ -30,11 +30,11 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_auth_ldap_sup:start_link(), {ok, Sup} = emqx_auth_ldap_sup:start_link(),
if_enabled([device_dn, match_objectclass, _ = if_enabled([device_dn, match_objectclass,
username_attr, password_attr, username_attr, password_attr,
filters, custom_base_dn, bind_as_user], filters, custom_base_dn, bind_as_user],
fun load_auth_hook/1), fun load_auth_hook/1),
if_enabled([device_dn, match_objectclass, _ = if_enabled([device_dn, match_objectclass,
username_attr, password_attr, username_attr, password_attr,
filters, custom_base_dn, bind_as_user], filters, custom_base_dn, bind_as_user],
fun load_acl_hook/1), fun load_acl_hook/1),
@ -60,8 +60,7 @@ load_acl_hook(DeviceDn) ->
if_enabled(Cfgs, Fun) -> if_enabled(Cfgs, Fun) ->
case get_env(Cfgs) of case get_env(Cfgs) of
{ok, InitArgs} -> Fun(InitArgs); {ok, InitArgs} -> Fun(InitArgs)
[] -> ok
end. end.
get_env(Cfgs) -> get_env(Cfgs) ->

View File

@ -16,8 +16,8 @@
-module(emqx_auth_ldap_bind_as_user_SUITE). -module(emqx_auth_ldap_bind_as_user_SUITE).
-compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-compile(no_warning_export).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").

View File

@ -202,12 +202,6 @@ do_validation(login, {clientid, V}) when is_binary(V)
do_validation(login, {username, V}) when is_binary(V) do_validation(login, {username, V}) when is_binary(V)
andalso byte_size(V) > 0-> andalso byte_size(V) > 0->
true; true;
do_validation(clientid, V) when is_binary(V)
andalso byte_size(V) > 0 ->
true;
do_validation(username, V) when is_binary(V)
andalso byte_size(V) > 0 ->
true;
do_validation(topic, V) when is_binary(V) do_validation(topic, V) when is_binary(V)
andalso byte_size(V) > 0 -> andalso byte_size(V) > 0 ->
true; true;

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually {vsn, "4.3.0"}, % strict semver, bump manually
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel,stdlib,mnesia,emqx]}, {applications, [kernel,stdlib,mnesia]},
{mod, {emqx_auth_mnesia_app,[]}}, {mod, {emqx_auth_mnesia_app,[]}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -37,9 +37,9 @@ init(#{clientid_list := ClientidList, username_list := UsernameList}) ->
{disc_copies, [node()]}, {disc_copies, [node()]},
{attributes, record_info(fields, emqx_user)}, {attributes, record_info(fields, emqx_user)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]), {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
[ add_default_user({{clientid, iolist_to_binary(Clientid)}, iolist_to_binary(Password)}) _ = [ add_default_user({{clientid, iolist_to_binary(Clientid)}, iolist_to_binary(Password)})
|| {Clientid, Password} <- ClientidList], || {Clientid, Password} <- ClientidList],
[ add_default_user({{username, iolist_to_binary(Username)}, iolist_to_binary(Password)}) _ = [ add_default_user({{username, iolist_to_binary(Username)}, iolist_to_binary(Password)})
|| {Username, Password} <- UsernameList], || {Username, Password} <- UsernameList],
ok = ekka_mnesia:copy_table(emqx_user, disc_copies). ok = ekka_mnesia:copy_table(emqx_user, disc_copies).
@ -63,10 +63,8 @@ check(ClientInfo = #{ clientid := Clientid
emqx_metrics:inc(?AUTH_METRICS(ignore)), emqx_metrics:inc(?AUTH_METRICS(ignore)),
ok; ok;
List -> List ->
case [ Hash || <<Salt:4/binary, Hash/binary>> <- lists:sort(fun emqx_auth_mnesia_cli:comparing/2, List), case match_password(NPassword, HashType, List) of
Hash =:= hash(NPassword, Salt, HashType) false ->
] of
[] ->
?LOG(error, "[Mnesia] Auth from mnesia failed: ~p", [ClientInfo]), ?LOG(error, "[Mnesia] Auth from mnesia failed: ~p", [ClientInfo]),
emqx_metrics:inc(?AUTH_METRICS(failure)), emqx_metrics:inc(?AUTH_METRICS(failure)),
{stop, AuthResult#{anonymous => false, auth_result => password_error}}; {stop, AuthResult#{anonymous => false, auth_result => password_error}};
@ -78,7 +76,34 @@ check(ClientInfo = #{ clientid := Clientid
description() -> "Authentication with Mnesia". description() -> "Authentication with Mnesia".
match_password(Password, HashType, HashList) ->
lists:any(
fun(Secret) ->
case is_salt_hash(Secret, HashType) of
true ->
<<Salt:4/binary, Hash/binary>> = Secret,
Hash =:= hash(Password, Salt, HashType);
_ ->
Secret =:= hash(Password, HashType)
end
end, HashList).
hash(undefined, HashType) ->
hash(<<>>, HashType);
hash(Password, HashType) ->
emqx_passwd:hash(HashType, Password).
hash(undefined, SaltBin, HashType) -> hash(undefined, SaltBin, HashType) ->
hash(<<>>, SaltBin, HashType); hash(<<>>, SaltBin, HashType);
hash(Password, SaltBin, HashType) -> hash(Password, SaltBin, HashType) ->
emqx_passwd:hash(HashType, <<SaltBin/binary, Password/binary>>). emqx_passwd:hash(HashType, <<SaltBin/binary, Password/binary>>).
is_salt_hash(_, plain) ->
true;
is_salt_hash(Secret, HashType) ->
not (byte_size(Secret) == len(HashType)).
len(md5) -> 32;
len(sha) -> 40;
len(sha256) -> 64;
len(sha512) -> 128.

View File

@ -221,7 +221,9 @@ paginate(Tables, MatchSpec, Params, ComparingFun, RowFun) ->
Limit = limit(Params), Limit = limit(Params),
Cursor = qlc:cursor(Qh), Cursor = qlc:cursor(Qh),
case Page > 1 of case Page > 1 of
true -> qlc:next_answers(Cursor, (Page - 1) * Limit); true ->
_ = qlc:next_answers(Cursor, (Page - 1) * Limit),
ok;
false -> ok false -> ok
end, end,
Rows = qlc:next_answers(Cursor, Limit), Rows = qlc:next_answers(Cursor, Limit),
@ -263,14 +265,6 @@ limit(Params) ->
%% Interval Funcs %% Interval Funcs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
format({?TABLE, {clientid, ClientId}, Password, _InterTime}) ->
#{clientid => ClientId,
password => Password};
format({?TABLE, {username, Username}, Password, _InterTime}) ->
#{username => Username,
password => Password};
format([{?TABLE, {clientid, ClientId}, Password, _InterTime}]) -> format([{?TABLE, {clientid, ClientId}, Password, _InterTime}]) ->
#{clientid => ClientId, #{clientid => ClientId,
password => Password}; password => Password};

View File

@ -38,8 +38,8 @@ start(_StartType, _StartArgs) ->
emqx_ctl:register_command(username, {emqx_auth_mnesia_cli, auth_username_cli}, []), emqx_ctl:register_command(username, {emqx_auth_mnesia_cli, auth_username_cli}, []),
emqx_ctl:register_command(user, {emqx_auth_mnesia_cli, auth_username_cli}, []), emqx_ctl:register_command(user, {emqx_auth_mnesia_cli, auth_username_cli}, []),
emqx_ctl:register_command(acl, {emqx_acl_mnesia_cli, cli}, []), emqx_ctl:register_command(acl, {emqx_acl_mnesia_cli, cli}, []),
load_auth_hook(), _ = load_auth_hook(),
load_acl_hook(), _ = load_acl_hook(),
{ok, Sup}. {ok, Sup}.
prep_stop(State) -> prep_stop(State) ->

View File

@ -68,13 +68,8 @@ do_update_user(User = #emqx_user{login = Login}) ->
-spec(lookup_user(tuple()) -> list()). -spec(lookup_user(tuple()) -> list()).
lookup_user(undefined) -> []; lookup_user(undefined) -> [];
lookup_user(Login) -> lookup_user(Login) ->
case mnesia:dirty_read(?TABLE, Login) of Re = mnesia:dirty_read(?TABLE, Login),
{error, Reason} -> lists:sort(fun comparing/2, Re).
?LOG(error, "[Mnesia] do_check_user error: ~p~n", [Reason]),
[];
Re ->
lists:sort(fun comparing/2, Re)
end.
%% @doc Remove user %% @doc Remove user
-spec(remove_user(tuple()) -> ok | {error, any()}). -spec(remove_user(tuple()) -> ok | {error, any()}).
@ -88,7 +83,6 @@ all_users() -> mnesia:dirty_all_keys(?TABLE).
all_users(clientid) -> all_users(clientid) ->
MatchSpec = ets:fun2ms(fun({?TABLE, {clientid, Clientid}, Password, CreatedAt}) -> {?TABLE, {clientid, Clientid}, Password, CreatedAt} end), MatchSpec = ets:fun2ms(fun({?TABLE, {clientid, Clientid}, Password, CreatedAt}) -> {?TABLE, {clientid, Clientid}, Password, CreatedAt} end),
lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec)); lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec));
all_users(username) -> all_users(username) ->
MatchSpec = ets:fun2ms(fun({?TABLE, {username, Username}, Password, CreatedAt}) -> {?TABLE, {username, Username}, Password, CreatedAt} end), MatchSpec = ets:fun2ms(fun({?TABLE, {username, Username}, Password, CreatedAt}) -> {?TABLE, {username, Username}, Password, CreatedAt} end),
lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec)). lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec)).
@ -167,7 +161,6 @@ auth_username_cli(["update", Username, NewPassword]) ->
ok -> emqx_ctl:print("ok~n"); ok -> emqx_ctl:print("ok~n");
{error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason]) {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason])
end; end;
auth_username_cli(["del", Username]) -> auth_username_cli(["del", Username]) ->
case remove_user({username, iolist_to_binary(Username)}) of case remove_user({username, iolist_to_binary(Username)}) of
ok -> emqx_ctl:print("ok~n"); ok -> emqx_ctl:print("ok~n");

View File

@ -16,6 +16,7 @@
-module(emqx_acl_mnesia_SUITE). -module(emqx_acl_mnesia_SUITE).
-compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-include("emqx_auth_mnesia.hrl"). -include("emqx_auth_mnesia.hrl").

View File

@ -13,6 +13,7 @@
-module(emqx_auth_mnesia_SUITE). -module(emqx_auth_mnesia_SUITE).
-compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-include("emqx_auth_mnesia.hrl"). -include("emqx_auth_mnesia.hrl").

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_mongo_sup]}, {registered, [emqx_auth_mongo_sup]},
{applications, [kernel,stdlib,mongodb,ecpool,emqx]}, {applications, [kernel,stdlib,mongodb,ecpool]},
{mod, {emqx_auth_mongo_app,[]}}, {mod, {emqx_auth_mongo_app,[]}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -82,15 +82,19 @@ description() -> "Authentication with MongoDB".
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Is Superuser? %% Is Superuser?
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(is_superuser(string(), maybe(#superquery{}), emqx_types:clientinfo()) -> boolean()).
is_superuser(_Pool, undefined, _ClientInfo) -> is_superuser(_Pool, undefined, _ClientInfo) ->
false; false;
is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Selector}, ClientInfo) -> is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Selector}, ClientInfo) ->
Row = query(Pool, Coll, maps:from_list(replvars(Selector, ClientInfo))), case query(Pool, Coll, maps:from_list(replvars(Selector, ClientInfo))) of
undefined -> false;
{error, Reason} ->
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
false;
Row ->
case maps:get(Field, Row, false) of case maps:get(Field, Row, false) of
true -> true; true -> true;
_False -> false _False -> false
end
end. end.
replvars(VarList, ClientInfo) -> replvars(VarList, ClientInfo) ->

View File

@ -1,9 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -36,8 +36,8 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_auth_mysql_sup:start_link(), {ok, Sup} = emqx_auth_mysql_sup:start_link(),
if_enabled(auth_query, fun load_auth_hook/1), _ = if_enabled(auth_query, fun load_auth_hook/1),
if_enabled(acl_query, fun load_acl_hook/1), _ = if_enabled(acl_query, fun load_acl_hook/1),
{ok, Sup}. {ok, Sup}.

View File

@ -0,0 +1,30 @@
version: '3'
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_pgsql
networks:
- emqx_bridge
depends_on:
- pgsql_server
tty: true
pgsql_server:
build:
context: ./pgsql
args:
BUILD_FROM: postgres:${PGSQL_TAG}
image: emqx-pgsql
restart: always
environment:
POSTGRES_PASSWORD: public
POSTGRES_USER: root
POSTGRES_DB: mqtt
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,8 @@
ARG BUILD_FROM=postgres:11
FROM ${BUILD_FROM}
COPY pg.conf /etc/postgresql/postgresql.conf
COPY server-cert.pem /etc/postgresql/server-cert.pem
COPY server-key.pem /etc/postgresql/server-key.pem
RUN chown -R postgres:postgres /etc/postgresql \
&& chmod 600 /etc/postgresql/*.pem
CMD ["-c", "config_file=/etc/postgresql/postgresql.conf"]

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_pgsql_sup]}, {registered, [emqx_auth_pgsql_sup]},
{applications, [kernel,stdlib,epgsql,ecpool,emqx]}, {applications, [kernel,stdlib,epgsql,ecpool]},
{mod, {emqx_auth_pgsql_app,[]}}, {mod, {emqx_auth_pgsql_app,[]}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -1,9 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -29,6 +29,8 @@
, equery/3 , equery/3
]). ]).
-type client_info() :: #{username:=_, clientid:=_, peerhost:=_, _=>_}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Avoid SQL Injection: Parse SQL to Parameter Query. %% Avoid SQL Injection: Parse SQL to Parameter Query.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -62,9 +64,6 @@ connect(Opts) ->
{ok, C} -> {ok, C} ->
conn_post(C), conn_post(C),
{ok, C}; {ok, C};
{error, Reason = econnrefused} ->
?LOG(error, "[Postgres] Can't connect to Postgres server: Connection refused."),
{error, Reason};
{error, Reason = invalid_authorization_specification} -> {error, Reason = invalid_authorization_specification} ->
?LOG(error, "[Postgres] Can't connect to Postgres server: Invalid authorization specification."), ?LOG(error, "[Postgres] Can't connect to Postgres server: Invalid authorization specification."),
{error, Reason}; {error, Reason};
@ -104,9 +103,11 @@ conn_opts([Opt = {ssl_opts, _}|Opts], Acc) ->
conn_opts([_Opt|Opts], Acc) -> conn_opts([_Opt|Opts], Acc) ->
conn_opts(Opts, Acc). conn_opts(Opts, Acc).
-spec(equery(atom(), string() | epgsql:statement(), Parameters::[any()]) -> {ok, ColumnsDescription :: [any()], RowsValues :: [any()]} | {error, any()} ).
equery(Pool, Sql, Params) -> equery(Pool, Sql, Params) ->
ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, Params) end). ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, Params) end).
-spec(equery(atom(), string() | epgsql:statement(), Parameters::[any()], client_info()) -> {ok, ColumnsDescription :: [any()], RowsValues :: [any()]} | {error, any()} ).
equery(Pool, Sql, Params, ClientInfo) -> equery(Pool, Sql, Params, ClientInfo) ->
ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, replvar(Params, ClientInfo)) end). ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, replvar(Params, ClientInfo)) end).

View File

@ -16,6 +16,7 @@
-module(emqx_auth_pgsql_SUITE). -module(emqx_auth_pgsql_SUITE).
-compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-define(POOL, emqx_auth_pgsql). -define(POOL, emqx_auth_pgsql).

View File

@ -0,0 +1,39 @@
version: '2.4'
# network configuration is limited in version 3
# https://github.com/docker/compose/issues/4958
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_redis
networks:
- app_net
depends_on:
- redis_cluster
tty: true
redis_cluster:
image: redis:6.0.9
container_name: redis-cluster
volumes:
- ../test/emqx_auth_redis_SUITE_data/certs:/tls
- ./redis/:/data/conf
command: bash -c "/bin/bash /data/conf/redis.sh -t && while true; do echo 1; sleep 1; done"
networks:
app_net:
# Assign a public address. Erlang container cannot find cluster nodes by network-scoped alias (redis_cluster).
ipv4_address: 172.16.239.10
ipv6_address: 2001:3200:3200::20
networks:
app_net:
driver: bridge
enable_ipv6: true
ipam:
driver: default
config:
- subnet: 172.16.239.0/24
gateway: 172.16.239.1
- subnet: 2001:3200:3200::/64
gateway: 2001:3200:3200::1

View File

@ -0,0 +1,38 @@
version: '2.4'
# network configuration is limited in version 3
# https://github.com/docker/compose/issues/4958
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_redis
networks:
- app_net
depends_on:
- redis_cluster
tty: true
redis_cluster:
image: redis:${REDIS_TAG}
container_name: redis-cluster
volumes:
- ./redis/:/data/conf
command: bash -c "/bin/bash /data/conf/redis.sh && while true; do echo 1; sleep 1; done"
networks:
app_net:
# Assign a public address. Erlang container cannot find cluster nodes by network-scoped alias (redis_cluster).
ipv4_address: 172.16.239.10
ipv6_address: 2001:3200:3200::20
networks:
app_net:
driver: bridge
enable_ipv6: true
ipam:
driver: default
config:
- subnet: 172.16.239.0/24
gateway: 172.16.239.1
- subnet: 2001:3200:3200::/64
gateway: 2001:3200:3200::1

View File

@ -0,0 +1,31 @@
version: '3'
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_redis
networks:
- emqx_bridge
depends_on:
- redis_server
tty: true
redis_server:
image: redis:6.0.9
volumes:
- ../test/emqx_auth_redis_SUITE_data/certs:/tls
command:
- redis-server
- "--bind 0.0.0.0 ::"
- --tls-port 6380
- --tls-cert-file /tls/redis.crt
- --tls-key-file /tls/redis.key
- --tls-ca-cert-file /tls/ca.crt
restart: always
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,25 @@
version: '3'
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_redis
networks:
- emqx_bridge
depends_on:
- redis_server
tty: true
redis_server:
image: redis:${REDIS_TAG}
command:
- redis-server
- "--bind 0.0.0.0 ::"
restart: always
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,3 @@
cluster-enabled yes
cluster-node-timeout 10000
bind 0.0.0.0 ::

View File

@ -0,0 +1,71 @@
#!/bin/bash
tls=false;
while getopts t OPT
do
case $OPT in
t) tls=true
;;
\?) exit
;;
esac
done
rm -f \
/data/conf/r7000i.log \
/data/conf/r7001i.log \
/data/conf/r7002i.log \
/data/conf/nodes.7000.conf \
/data/conf/nodes.7001.conf \
/data/conf/nodes.7002.conf ;
if $tls ; then
redis-server /data/conf/redis.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf --daemonize yes \
--tls-port 8000 \
--tls-cert-file /tls/redis.crt \
--tls-key-file /tls/redis.key \
--tls-ca-cert-file /tls/ca.crt
redis-server /data/conf/redis.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf --daemonize yes \
--tls-port 8001 \
--tls-cert-file /tls/redis.crt \
--tls-key-file /tls/redis.key \
--tls-ca-cert-file /tls/ca.crt
redis-server /data/conf/redis.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf --daemonize yes \
--tls-port 8002 \
--tls-cert-file /tls/redis.crt \
--tls-key-file /tls/redis.key \
--tls-ca-cert-file /tls/ca.crt
else
redis-server /data/conf/redis.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf --daemonize yes ;
redis-server /data/conf/redis.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf --daemonize yes ;
redis-server /data/conf/redis.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf --daemonize yes ;
fi
REDIS_LOAD_FLG=true;
while $REDIS_LOAD_FLG;
do
sleep 1;
redis-cli -p 7000 info 1> /data/conf/r7000i.log 2> /dev/null;
if [ -s /data/conf/r7000i.log ]; then
:
else
continue;
fi
redis-cli -p 7001 info 1> /data/conf/r7001i.log 2> /dev/null;
if [ -s /data/conf/r7001i.log ]; then
:
else
continue;
fi
redis-cli -p 7002 info 1> /data/conf/r7002i.log 2> /dev/null;
if [ -s /data/conf/r7002i.log ]; then
:
else
continue;
fi
yes "yes" | redis-cli --cluster create 172.16.239.10:7000 172.16.239.10:7001 172.16.239.10:7002;
REDIS_LOAD_FLG=false;
done
exit 0;

View File

@ -24,3 +24,5 @@ erlang.mk
rebar.lock rebar.lock
/.idea/ /.idea/
.DS_Store .DS_Store
/.ci/redis/nodes.*.conf
/.ci/redis/*.log

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_redis_sup]}, {registered, [emqx_auth_redis_sup]},
{applications, [kernel,stdlib,eredis,eredis_cluster,ecpool,emqx]}, {applications, [kernel,stdlib,eredis,eredis_cluster,ecpool]},
{mod, {emqx_auth_redis_app, []}}, {mod, {emqx_auth_redis_app, []}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -1,10 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -28,8 +28,8 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_auth_redis_sup:start_link(), {ok, Sup} = emqx_auth_redis_sup:start_link(),
if_cmd_enabled(auth_cmd, fun load_auth_hook/1), _ = if_cmd_enabled(auth_cmd, fun load_auth_hook/1),
if_cmd_enabled(acl_cmd, fun load_acl_hook/1), _ = if_cmd_enabled(acl_cmd, fun load_acl_hook/1),
{ok, Sup}. {ok, Sup}.
stop(_State) -> stop(_State) ->

View File

@ -38,7 +38,7 @@ connect(Opts) ->
Host = case Sentinel =:= "" of Host = case Sentinel =:= "" of
true -> get_value(host, Opts); true -> get_value(host, Opts);
false -> false ->
eredis_sentinel:start_link(get_value(servers, Opts)), _ = eredis_sentinel:start_link(get_value(servers, Opts)),
"sentinel:" ++ Sentinel "sentinel:" ++ Sentinel
end, end,
case eredis:start_link(Host, case eredis:start_link(Host,

View File

@ -16,6 +16,7 @@
-module(emqx_auth_redis_SUITE). -module(emqx_auth_redis_SUITE).
-compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
@ -69,21 +70,18 @@ set_special_configs(_App) ->
ok. ok.
init_redis_rows() -> init_redis_rows() ->
{ok, Connection} = ?POOL(?APP),
%% Users %% Users
[eredis:q(Connection, ["HMSET", Key|FiledValue]) || {Key, FiledValue} <- ?INIT_AUTH], [q(["HMSET", Key|FiledValue]) || {Key, FiledValue} <- ?INIT_AUTH],
%% ACLs %% ACLs
emqx_modules:load_module(emqx_mod_acl_internal, false), emqx_modules:load_module(emqx_mod_acl_internal, false),
Result = [eredis:q(Connection, ["HSET", Key, Filed, Value]) || {Key, Filed, Value} <- ?INIT_ACL], Result = [q(["HSET", Key, Filed, Value]) || {Key, Filed, Value} <- ?INIT_ACL],
ct:pal("redis init result: ~p~n", [Result]). ct:pal("redis init result: ~p~n", [Result]).
deinit_redis_rows() -> deinit_redis_rows() ->
{ok, Connection} = ?POOL(?APP),
AuthKeys = [Key || {Key, _Filed, _Value} <- ?INIT_AUTH], AuthKeys = [Key || {Key, _Filed, _Value} <- ?INIT_AUTH],
AclKeys = [Key || {Key, _Value} <- ?INIT_ACL], AclKeys = [Key || {Key, _Value} <- ?INIT_ACL],
eredis:q(Connection, ["DEL" | AuthKeys]), q(["DEL" | AuthKeys]),
eredis:q(Connection, ["DEL" | AclKeys]). q(["DEL" | AclKeys]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Cases %% Cases
@ -121,9 +119,8 @@ t_check_auth(_) ->
{error, _} = emqx_access_control:authenticate(Bcrypt#{password => <<"password">>}). {error, _} = emqx_access_control:authenticate(Bcrypt#{password => <<"password">>}).
t_check_auth_hget(_) -> t_check_auth_hget(_) ->
{ok, Connection} = ?POOL(?APP), q(["HSET", "mqtt_user:hset", "password", "hset"]),
eredis:q(Connection, ["HSET", "mqtt_user:hset", "password", "hset"]), q(["HSET", "mqtt_user:hset", "is_superuser", "1"]),
eredis:q(Connection, ["HSET", "mqtt_user:hset", "is_superuser", "1"]),
reload([{password_hash, plain}, {auth_cmd, "HGET mqtt_user:%u password"}]), reload([{password_hash, plain}, {auth_cmd, "HGET mqtt_user:%u password"}]),
Hset = #{clientid => <<"hset">>, username => <<"hset">>, zone => external}, Hset = #{clientid => <<"hset">>, username => <<"hset">>, zone => external},
{ok, #{is_superuser := true}} = emqx_access_control:authenticate(Hset#{password => <<"hset">>}). {ok, #{is_superuser := true}} = emqx_access_control:authenticate(Hset#{password => <<"hset">>}).
@ -164,6 +161,16 @@ t_acl_super(_) ->
end, end,
emqtt:disconnect(C). emqtt:disconnect(C).
t_check_cluster_connection(_) ->
?assertMatch({error, _Reason}, reload([{server, [{type,cluster},
{pool_size,8},
{auto_reconnect,1},
{database,0},
{password,[]},
{sentinel,[]},
{servers,[{"wrong",6379},{"wrong",6380},{"wrong",6381}]}]}])).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -172,3 +179,13 @@ reload(Config) when is_list(Config) ->
application:stop(?APP), application:stop(?APP),
[application:set_env(?APP, K, V) || {K, V} <- Config], [application:set_env(?APP, K, V) || {K, V} <- Config],
application:start(?APP). application:start(?APP).
q(Cmd) ->
{ok, Server} = application:get_env(?APP, server),
case proplists:get_value(type, Server) of
single ->
{ok, Connection} = ?POOL(?APP),
eredis:q(Connection, Cmd);
cluster ->
eredis_cluster:q(emqx_auth_redis, Cmd)
end.

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel,stdlib,replayq,emqtt,emqx]}, {applications, [kernel,stdlib,replayq,emqtt]},
{mod, {emqx_bridge_mqtt_app, []}}, {mod, {emqx_bridge_mqtt_app, []}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -1,10 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<"*.">>, []}
]
}.

View File

@ -186,6 +186,5 @@ replvar([Key|More], Options) ->
%% ${node} => node() %% ${node} => node()
feedvar(clientid, ClientId, _) -> feedvar(clientid, ClientId, _) ->
iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node()))); iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node()))).
feedvar(_, Val, _) ->
Val.

View File

@ -56,16 +56,12 @@ cli(["forwards", Name]) ->
end, emqx_bridge_worker:get_forwards(Name)); end, emqx_bridge_worker:get_forwards(Name));
cli(["add-forward", Name, Topic]) -> cli(["add-forward", Name, Topic]) ->
case emqx_bridge_worker:ensure_forward_present(Name, iolist_to_binary(Topic)) of ok = emqx_bridge_worker:ensure_forward_present(Name, iolist_to_binary(Topic)),
ok -> emqx_ctl:print("Add-forward topic successfully.~n"); emqx_ctl:print("Add-forward topic successfully.~n");
{error, Reason} -> emqx_ctl:print("Add-forward failed reason: ~p.~n", [Reason])
end;
cli(["del-forward", Name, Topic]) -> cli(["del-forward", Name, Topic]) ->
case emqx_bridge_worker:ensure_forward_absent(Name, iolist_to_binary(Topic)) of ok = emqx_bridge_worker:ensure_forward_absent(Name, iolist_to_binary(Topic)),
ok -> emqx_ctl:print("Del-forward topic successfully.~n"); emqx_ctl:print("Del-forward topic successfully.~n");
{error, Reason} -> emqx_ctl:print("Del-forward failed reason: ~p.~n", [Reason])
end;
cli(["subscriptions", Name]) -> cli(["subscriptions", Name]) ->
foreach(fun({Topic, Qos}) -> foreach(fun({Topic, Qos}) ->
@ -79,10 +75,8 @@ cli(["add-subscription", Name, Topic, Qos]) ->
end; end;
cli(["del-subscription", Name, Topic]) -> cli(["del-subscription", Name, Topic]) ->
case emqx_bridge_worker:ensure_subscription_absent(Name, Topic) of ok = emqx_bridge_worker:ensure_subscription_absent(Name, Topic),
ok -> emqx_ctl:print("Del-subscription topic successfully.~n"); emqx_ctl:print("Del-subscription topic successfully.~n");
{error, Reason} -> emqx_ctl:print("Del-subscription failed reason: ~p.~n", [Reason])
end;
cli(_) -> cli(_) ->
emqx_ctl:usage([{"bridges list", "List bridges"}, emqx_ctl:usage([{"bridges list", "List bridges"},

View File

@ -33,6 +33,7 @@
-type ack_ref() :: emqx_bridge_worker:ack_ref(). -type ack_ref() :: emqx_bridge_worker:ack_ref().
-type batch() :: emqx_bridge_worker:batch(). -type batch() :: emqx_bridge_worker:batch().
-type node_or_tuple() :: atom() | {atom(), term()}.
-define(HEARTBEAT_INTERVAL, timer:seconds(1)). -define(HEARTBEAT_INTERVAL, timer:seconds(1)).
@ -61,7 +62,7 @@ stop(#{client_pid := Pid}) when is_pid(Pid) ->
ok. ok.
%% @doc Callback for `emqx_bridge_connect' behaviour %% @doc Callback for `emqx_bridge_connect' behaviour
-spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}. -spec send(#{address:=node_or_tuple(), _=>_}, batch()) -> {ok, ack_ref()} | {error, any()}.
send(#{address := Remote}, Batch) -> send(#{address := Remote}, Batch) ->
case ?RPC:call(Remote, ?MODULE, handle_send, [Batch]) of case ?RPC:call(Remote, ?MODULE, handle_send, [Batch]) of
ok -> ok ->

View File

@ -307,7 +307,7 @@ idle({call, From}, ensure_started, State) ->
case do_connect(State) of case do_connect(State) of
{ok, State1} -> {ok, State1} ->
{next_state, connected, State1, [{reply, From, ok}, {state_timeout, 0, connected}]}; {next_state, connected, State1, [{reply, From, ok}, {state_timeout, 0, connected}]};
{error, Reason} -> {error, Reason, _State} ->
{keep_state_and_data, [{reply, From, {error, Reason}}]} {keep_state_and_data, [{reply, From, {error, Reason}}]}
end; end;
%% @doc Standing by for manual start. %% @doc Standing by for manual start.
@ -320,12 +320,8 @@ idle(state_timeout, reconnect, State) ->
connecting(State); connecting(State);
idle(info, {batch_ack, Ref}, State) -> idle(info, {batch_ack, Ref}, State) ->
case do_ack(State, Ref) of {ok, NewState} = do_ack(State, Ref),
{ok, NewState} ->
{keep_state, NewState}; {keep_state, NewState};
_ ->
keep_state_and_data
end;
idle(Type, Content, State) -> idle(Type, Content, State) ->
common(idle, Type, Content, State). common(idle, Type, Content, State).
@ -359,12 +355,8 @@ connected(info, {disconnected, Conn, Reason},
keep_state_and_data keep_state_and_data
end; end;
connected(info, {batch_ack, Ref}, State) -> connected(info, {batch_ack, Ref}, State) ->
case do_ack(State, Ref) of {ok, NewState} = do_ack(State, Ref),
{ok, NewState} ->
{keep_state, NewState, {next_event, internal, maybe_send}}; {keep_state, NewState, {next_event, internal, maybe_send}};
_ ->
keep_state_and_data
end;
connected(Type, Content, State) -> connected(Type, Content, State) ->
common(connected, Type, Content, State). common(connected, Type, Content, State).

View File

@ -124,39 +124,6 @@ manual_start_stop_test() ->
emqx_bridge_worker:ensure_stopped(?BRIDGE_REG_NAME), emqx_bridge_worker:ensure_stopped(?BRIDGE_REG_NAME),
emqx_metrics:stop(). emqx_metrics:stop().
%% Feed messages to bridge
sender_loop(_Pid, [], _) -> exit(normal);
sender_loop(Pid, [Num | Rest], Interval) ->
random_sleep(Interval),
Pid ! {deliver, dummy, make_msg(Num)},
sender_loop(Pid, Rest, Interval).
%% Feed acknowledgments to bridge
receiver_loop(_Pid, [], _) -> ok;
receiver_loop(Pid, Nums, Interval) ->
receive
{batch, BatchRef, Batch} ->
Rest = match_nums(Batch, Nums),
random_sleep(Interval),
emqx_bridge_worker:handle_ack(Pid, BatchRef),
receiver_loop(Pid, Rest, Interval)
end.
random_sleep(MaxInterval) ->
case rand:uniform(MaxInterval) - 1 of
0 -> ok;
T -> timer:sleep(T)
end.
match_nums([], Rest) -> Rest;
match_nums([#message{payload = P} | Rest], Nums) ->
I = binary_to_integer(P),
case Nums of
[I | NumsLeft] -> match_nums(Rest, NumsLeft);
[J | _] when J > I -> match_nums(Rest, Nums); %% allow retry
_ -> error([{received, I}, {expecting, Nums}])
end.
make_config(Ref, TestPid, Result) -> make_config(Ref, TestPid, Result) ->
#{test_pid => TestPid, #{test_pid => TestPid,
test_ref => Ref, test_ref => Ref,
@ -165,8 +132,3 @@ make_config(Ref, TestPid, Result) ->
connect_result => Result, connect_result => Result,
start_type => auto start_type => auto
}. }.
make_msg(I) ->
Payload = integer_to_binary(I),
emqx_message:make(<<"test/topic">>, Payload).

View File

@ -1,6 +1,6 @@
{deps, {deps,
[ [
{gen_coap, {git, "https://github.com/emqx/gen_coap", {tag, "v0.3.0"}}} {gen_coap, {git, "https://github.com/emqx/gen_coap", {tag, "v0.3.1"}}}
]}. ]}.
{edoc_opts, [{preprocess, true}]}. {edoc_opts, [{preprocess, true}]}.

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel,stdlib,gen_coap,emqx]}, {applications, [kernel,stdlib,gen_coap]},
{mod, {emqx_coap_app, []}}, {mod, {emqx_coap_app, []}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -30,7 +30,7 @@ start(_Type, _Args) ->
{ok, Sup} = emqx_coap_sup:start_link(), {ok, Sup} = emqx_coap_sup:start_link(),
coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined), coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined),
coap_server_registry:add_handler([<<"ps">>], emqx_coap_ps_resource, undefined), coap_server_registry:add_handler([<<"ps">>], emqx_coap_ps_resource, undefined),
emqx_coap_ps_topics:start_link(), _ = emqx_coap_ps_topics:start_link(),
emqx_coap_server:start(application:get_all_env(?APP)), emqx_coap_server:start(application:get_all_env(?APP)),
{ok,Sup}. {ok,Sup}.

View File

@ -136,7 +136,7 @@ handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = Top
{reply, ok, State#state{sub_topics = NewTopics}, hibernate}; {reply, ok, State#state{sub_topics = NewTopics}, hibernate};
handle_call({publish, Topic, Payload}, _From, State) -> handle_call({publish, Topic, Payload}, _From, State) ->
chann_publish(Topic, Payload, State), _ = chann_publish(Topic, Payload, State),
{reply, ok, State}; {reply, ok, State};
handle_call(info, _From, State) -> handle_call(info, _From, State) ->
@ -233,10 +233,6 @@ do_deliver({Topic, Payload}, Subscribers) ->
%% handle PUBLISH packet from broker %% handle PUBLISH packet from broker
?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]), ?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]),
deliver_to_coap(Topic, Payload, Subscribers), deliver_to_coap(Topic, Payload, Subscribers),
ok;
do_deliver(Pkt, _Subscribers) ->
?LOG(warning, "unknown packet type to deliver, pkt=~p,", [Pkt]),
ok. ok.
deliver_to_coap(_TopicName, _Payload, []) -> deliver_to_coap(_TopicName, _Payload, []) ->

View File

@ -104,7 +104,7 @@ lookup_topic_payload(Topic) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
ets:new(?COAP_TOPIC_TABLE, [set, named_table, protected]), _ = ets:new(?COAP_TOPIC_TABLE, [set, named_table, protected]),
?LOG(debug, "Create the coap_topic table", []), ?LOG(debug, "Create the coap_topic table", []),
{ok, #state{}}. {ok, #state{}}.

View File

@ -86,8 +86,8 @@ stop() ->
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
init([]) -> init([]) ->
ets:new(?RESPONSE_TAB, [set, named_table, protected]), _ = ets:new(?RESPONSE_TAB, [set, named_table, protected]),
ets:new(?RESPONSE_REF_TAB, [set, named_table, protected]), _ = ets:new(?RESPONSE_REF_TAB, [set, named_table, protected]),
{ok, #state{}}. {ok, #state{}}.
handle_call({register_name, Name, Pid}, _From, State) -> handle_call({register_name, Name, Pid}, _From, State) ->

View File

@ -84,7 +84,7 @@ t_observe(_Config) ->
Notif = receive_notification(), Notif = receive_notification(),
?LOGT("observer get Notif=~p", [Notif]), ?LOGT("observer get Notif=~p", [Notif]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv}} = Notif, {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv}} = Notif,
?_assertEqual(Payload, PayloadRecv), ?assertEqual(Payload, PayloadRecv),
er_coap_observer:stop(Pid), er_coap_observer:stop(Pid),
timer:sleep(100), timer:sleep(100),
@ -107,7 +107,7 @@ t_observe_wildcard(_Config) ->
Notif = receive_notification(), Notif = receive_notification(),
?LOGT("observer get Notif=~p", [Notif]), ?LOGT("observer get Notif=~p", [Notif]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv}} = Notif, {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv}} = Notif,
?_assertEqual(Payload, PayloadRecv), ?assertEqual(Payload, PayloadRecv),
er_coap_observer:stop(Pid), er_coap_observer:stop(Pid),
timer:sleep(100), timer:sleep(100),
@ -133,7 +133,7 @@ t_observe_pub(_Config) ->
Notif2 = receive_notification(), Notif2 = receive_notification(),
?LOGT("observer get Notif2=~p", [Notif2]), ?LOGT("observer get Notif2=~p", [Notif2]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2, {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2,
?_assertEqual(Payload2, PayloadRecv2), ?assertEqual(Payload2, PayloadRecv2),
Topic3 = <<"j/b">>, Payload3 = <<"ET629">>, Topic3 = <<"j/b">>, Payload3 = <<"ET629">>,
TopicStr3 = http_uri:encode(binary_to_list(Topic3)), TopicStr3 = http_uri:encode(binary_to_list(Topic3)),
@ -144,7 +144,7 @@ t_observe_pub(_Config) ->
Notif3 = receive_notification(), Notif3 = receive_notification(),
?LOGT("observer get Notif3=~p", [Notif3]), ?LOGT("observer get Notif3=~p", [Notif3]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv3}} = Notif3, {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv3}} = Notif3,
?_assertEqual(Payload3, PayloadRecv3), ?assertEqual(Payload3, PayloadRecv3),
er_coap_observer:stop(Pid). er_coap_observer:stop(Pid).
@ -172,14 +172,14 @@ t_one_clientid_sub_2_topics(_Config) ->
Notif1 = receive_notification(), Notif1 = receive_notification(),
?LOGT("observer 1 get Notif=~p", [Notif1]), ?LOGT("observer 1 get Notif=~p", [Notif1]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv1}} = Notif1, {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv1}} = Notif1,
?_assertEqual(Payload1, PayloadRecv1), ?assertEqual(Payload1, PayloadRecv1),
emqx:publish(emqx_message:make(Topic2, Payload2)), emqx:publish(emqx_message:make(Topic2, Payload2)),
Notif2 = receive_notification(), Notif2 = receive_notification(),
?LOGT("observer 2 get Notif=~p", [Notif2]), ?LOGT("observer 2 get Notif=~p", [Notif2]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2, {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2,
?_assertEqual(Payload2, PayloadRecv2), ?assertEqual(Payload2, PayloadRecv2),
er_coap_observer:stop(Pid1), er_coap_observer:stop(Pid1),
er_coap_observer:stop(Pid2). er_coap_observer:stop(Pid2).

View File

@ -417,13 +417,13 @@ t_case01_subscribe(_Config) ->
?LOGT("observer get Notif=~p", [Notif]), ?LOGT("observer get Notif=~p", [Notif]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv}} = Notif, {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv}} = Notif,
?_assertEqual(Payload, PayloadRecv), ?assertEqual(Payload, PayloadRecv),
%% GET to read the publish message of the topic %% GET to read the publish message of the topic
Reply1 = er_coap_client:request(get, Uri1), Reply1 = er_coap_client:request(get, Uri1),
?LOGT("Reply=~p", [Reply1]), ?LOGT("Reply=~p", [Reply1]),
{ok,content, #coap_content{max_age = MaxAgeLeft,payload = <<"123">>}} = Reply1, {ok,content, #coap_content{max_age = MaxAgeLeft,payload = <<"123">>}} = Reply1,
?_assertEqual(true, MaxAgeLeft<60), ?assertEqual(true, MaxAgeLeft<60),
er_coap_observer:stop(Pid), er_coap_observer:stop(Pid),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri1). {ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri1).
@ -515,7 +515,7 @@ t_case01_read(_Config) ->
Reply1 = er_coap_client:request(get, Uri), Reply1 = er_coap_client:request(get, Uri),
?LOGT("Reply=~p", [Reply1]), ?LOGT("Reply=~p", [Reply1]),
{ok,content, #coap_content{max_age = MaxAgeLeft,payload = Payload}} = Reply1, {ok,content, #coap_content{max_age = MaxAgeLeft,payload = Payload}} = Reply1,
?_assertEqual(true, MaxAgeLeft<60), ?assertEqual(true, MaxAgeLeft<60),
{ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri). {ok, deleted, #coap_content{}} = er_coap_client:request(delete, Uri).

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1 +0,0 @@
webpackJsonp([20],{"4odX":function(t,e){},LbE0:function(t,e,s){"use strict";Object.defineProperty(e,"__esModule",{value:!0});var a={name:"help-view",components:{},data:function(){return{lang:window.localStorage.getItem("language")||"en"}},computed:{learnEnterprise:function(){return"zh"===this.lang?"https://www.emqx.io/cn/products/enterprise":"https://www.emqx.io/products/enterprise"},freeTrial:function(){return"zh"===this.lang?"https://www.emqx.io/cn/downloads#enterprise":"https://www.emqx.io/downloads#enterprise"},docsLink:function(){return"zh"===this.lang?"https://docs.emqx.io/broker/v4/cn":"https://docs.emqx.io/broker/v4/en"},faqLink:function(){return"zh"===this.lang?"https://docs.emqx.io/broker/latest/cn/faq/faq.html":"https://docs.emqx.io/tutorial/v4/en/faq/faq.html"}},methods:{}},n={render:function(){var t=this,e=t.$createElement,s=t._self._c||e;return s("div",{staticClass:"help-view"},[s("div",{staticClass:"page-title"},[t._v(t._s(t.$t("leftbar.help")))]),t._v(" "),s("div",{staticClass:"help-item"},[s("h3",[t._v(t._s(t.$t("help.quickStart")))]),t._v(" "),s("p",[t._v(t._s(t.$t("help.emqxDesc")))]),t._v(" "),s("a",{attrs:{target:"_blank",href:"https://github.com/emqx/emqx"}},[t._v("Github")])]),t._v(" "),s("el-divider"),t._v(" "),s("div",{staticClass:"help-item"},[s("h3",[t._v(t._s(t.$t("help.emqxEnterprise")))]),t._v(" "),s("p",{domProps:{innerHTML:t._s(t.$t("help.enterpriseDesc"))}}),t._v(" "),s("a",{attrs:{target:"_blank",href:t.learnEnterprise}},[t._v("\n "+t._s(t.$t("oper.learnMore"))+"\n ")]),t._v(" "),s("a",{attrs:{target:"_blank",href:t.freeTrial}},[t._v("\n "+t._s(t.$t("help.freeTrial"))+"\n ")])]),t._v(" "),s("el-divider"),t._v(" "),s("div",{staticClass:"help-item"},[s("h3",[t._v(t._s(t.$t("help.useDocs")))]),t._v(" "),s("p",[t._v(t._s(t.$t("help.docsDesc")))]),t._v(" "),s("a",{attrs:{target:"_blank",href:t.docsLink}},[t._v("\n "+t._s(t.$t("help.forwardView"))+"\n ")])]),t._v(" "),s("el-divider"),t._v(" "),s("div",{staticClass:"help-item"},[s("h3",[t._v("FAQ")]),t._v(" "),s("p",[t._v(t._s(t.$t("help.faqDesc")))]),t._v(" "),s("a",{attrs:{target:"_blank",href:t.faqLink}},[t._v("\n "+t._s(t.$t("help.forwardFaq"))+"\n ")])]),t._v(" "),s("el-divider"),t._v(" "),s("div",{staticClass:"help-item"},[s("h3",[t._v(t._s(t.$t("help.followUs")))]),t._v(" "),t._m(0),t._v(" "),t._m(1),t._v(" "),t._m(2),t._v(" "),t._m(3),t._v(" "),t._m(4),t._v(" "),t._m(5)])],1)},staticRenderFns:[function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://github.com/emqx/emqx"}},[e("i",{staticClass:"iconfont icon-git"})])},function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://twitter.com/emqtt"}},[e("i",{staticClass:"iconfont icon-tuite"})])},function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://emqx.slack.com/"}},[e("i",{staticClass:"iconfont icon-slack"})])},function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://stackoverflow.com/questions/tagged/emq"}},[e("i",{staticClass:"iconfont icon-stack-overflow"})])},function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://groups.google.com/forum/#!forum/emqtt"}},[e("i",{staticClass:"iconfont icon-icons-google_groups"})])},function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://www.youtube.com/channel/UCDU9GWFk8NTGiTvPx_2XskA"}},[e("i",{staticClass:"iconfont icon-youtube"})])}]};var i=s("VU/8")(a,n,!1,function(t){s("4odX")},null,null);e.default=i.exports}});

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1 +0,0 @@
!function(e){var a=window.webpackJsonp;window.webpackJsonp=function(c,t,f){for(var o,d,b,i=0,u=[];i<c.length;i++)d=c[i],n[d]&&u.push(n[d][0]),n[d]=0;for(o in t)Object.prototype.hasOwnProperty.call(t,o)&&(e[o]=t[o]);for(a&&a(c,t,f);u.length;)u.shift()();if(f)for(i=0;i<f.length;i++)b=r(r.s=f[i]);return b};var c={},n={27:0};function r(a){if(c[a])return c[a].exports;var n=c[a]={i:a,l:!1,exports:{}};return e[a].call(n.exports,n,n.exports,r),n.l=!0,n.exports}r.e=function(e){var a=n[e];if(0===a)return new Promise(function(e){e()});if(a)return a[2];var c=new Promise(function(c,r){a=n[e]=[c,r]});a[2]=c;var t=document.getElementsByTagName("head")[0],f=document.createElement("script");f.type="text/javascript",f.charset="utf-8",f.async=!0,f.timeout=12e4,r.nc&&f.setAttribute("nonce",r.nc),f.src=r.p+"static/js/"+e+"."+{0:"7a09d1383e1319441399",1:"fcd6fde8b053e80bc68f",2:"71ffb214c95162432f13",3:"25b49772270df4b9915d",4:"93d4473fcf7768693652",5:"8935139a413f40d70253",6:"ef8e6aa7a51fa7564f71",7:"92a348a80764134ff2a9",8:"e86f6131cc8a9138368d",9:"473ceac05f7dfe3f3e92",10:"188c5e479f887d471dde",11:"3861aeb3036b8f41a6e8",12:"43feccc8f1584bdba5c2",13:"026a13a2a59abd354bd5",14:"0342a1a3d29f1adca947",15:"7d11711536eb5b2ca561",16:"6bfd6f3eb9216e73149c",17:"1d56280c16e6e2b81cff",18:"a0c394cb4b55bee2fa82",19:"060521bb4ba4f7a81ac0",20:"308aa0fdf6653ef3299f",21:"306758a2a6ef73532290",22:"d968dc6f54a690adde18",23:"7837b8f015b7d486b74f",24:"ccbcfda924431cb282e2",25:"8b2dccd8a7e1f91a5040",26:"9cd922cc7e5d035cbcc7"}[e]+".js";var o=setTimeout(d,12e4);function d(){f.onerror=f.onload=null,clearTimeout(o);var a=n[e];0!==a&&(a&&a[1](new Error("Loading chunk "+e+" failed.")),n[e]=void 0)}return f.onerror=f.onload=d,t.appendChild(f),c},r.m=e,r.c=c,r.d=function(e,a,c){r.o(e,a)||Object.defineProperty(e,a,{configurable:!1,enumerable:!0,get:c})},r.n=function(e){var a=e&&e.__esModule?function(){return e.default}:function(){return e};return r.d(a,"a",a),a},r.o=function(e,a){return Object.prototype.hasOwnProperty.call(e,a)},r.p="/",r.oe=function(e){throw console.error(e),e}}([]);

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_dashboard_sup]}, {registered, [emqx_dashboard_sup]},
{applications, [kernel,stdlib,mnesia,minirest,emqx]}, {applications, [kernel,stdlib,mnesia,minirest]},
{mod, {emqx_dashboard_app,[]}}, {mod, {emqx_dashboard_app,[]}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -180,7 +180,7 @@ check(Username, Password) ->
init([]) -> init([]) ->
%% Add default admin user %% Add default admin user
add_default_user(binenv(default_user_username), binenv(default_user_passwd)), _ = add_default_user(binenv(default_user_username), binenv(default_user_passwd)),
{ok, state}. {ok, state}.
handle_call(_Req, _From, State) -> handle_call(_Req, _From, State) ->
@ -210,7 +210,7 @@ md5_hash(SaltBin, Password) ->
erlang:md5(<<SaltBin/binary, Password/binary>>). erlang:md5(<<SaltBin/binary, Password/binary>>).
salt() -> salt() ->
emqx_misc:rand_seed(), _ = emqx_misc:rand_seed(),
Salt = rand:uniform(16#ffffffff), Salt = rand:uniform(16#ffffffff),
<<Salt:32>>. <<Salt:32>>.

View File

@ -16,6 +16,7 @@
-module(emqx_dashboard_SUITE). -module(emqx_dashboard_SUITE).
-compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_ct_http, -import(emqx_ct_http,

View File

@ -1,24 +0,0 @@
%%-*- mode: erlang -*-
%% .app.src.script
RemoveLeadingV =
fun(Tag) ->
case re:run(Tag, "^[v]?[0-9]\.[0-9]\.([0-9]|(rc|beta|alpha)\.[0-9])", [{capture, none}]) of
nomatch ->
re:replace(Tag, "/", "-", [{return ,list}]);
_ ->
%% if it is a version number prefixed by 'v' or 'e', then remove it
re:replace(Tag, "[v]", "", [{return ,list}])
end
end,
case os:getenv("EMQX_DEPS_DEFAULT_VSN") of
false -> CONFIG; % env var not defined
[] -> CONFIG; % env var set to empty string
Tag ->
[begin
AppConf0 = lists:keystore(vsn, 1, AppConf, {vsn, RemoveLeadingV(Tag)}),
{application, App, AppConf0}
end || Conf = {application, App, AppConf} <- CONFIG]
end.

View File

@ -1,9 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -64,9 +64,9 @@ disable(Name) ->
unsave(Name) unsave(Name)
end. end.
-spec disable_all() -> [term()]. -spec disable_all() -> ok.
disable_all() -> disable_all() ->
[begin disable(Name), Name end || Name <- running()]. lists:foreach(fun disable/1, running()).
%%---------------------------------------------------------- %%----------------------------------------------------------
%% Dispatch APIs %% Dispatch APIs

View File

@ -45,7 +45,7 @@ start(_StartType, _StartArgs) ->
load_all_servers(), load_all_servers(),
%% Register all hooks %% Register all hooks
load_exhooks(), _ = load_exhooks(),
%% Register CLI %% Register CLI
emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []), emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []),
@ -53,8 +53,8 @@ start(_StartType, _StartArgs) ->
prep_stop(State) -> prep_stop(State) ->
emqx_ctl:unregister_command(exhook), emqx_ctl:unregister_command(exhook),
unload_exhooks(), _ = unload_exhooks(),
unload_all_servers(), ok = unload_all_servers(),
State. State.
stop(_State) -> stop(_State) ->

View File

@ -1,24 +0,0 @@
%%-*- mode: erlang -*-
%% .app.src.script
RemoveLeadingV =
fun(Tag) ->
case re:run(Tag, "^[v]?[0-9]\.[0-9]\.([0-9]|(rc|beta|alpha)\.[0-9])", [{capture, none}]) of
nomatch ->
re:replace(Tag, "/", "-", [{return ,list}]);
_ ->
%% if it is a version number prefixed by 'v' or 'e', then remove it
re:replace(Tag, "[v]", "", [{return ,list}])
end
end,
case os:getenv("EMQX_DEPS_DEFAULT_VSN") of
false -> CONFIG; % env var not defined
[] -> CONFIG; % env var set to empty string
Tag ->
[begin
AppConf0 = lists:keystore(vsn, 1, AppConf, {vsn, RemoveLeadingV(Tag)}),
{application, App, AppConf0}
end || Conf = {application, App, AppConf} <- CONFIG]
end.

View File

@ -1,9 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -331,7 +331,7 @@ handle_call({publish, Topic, Qos, Payload},
_ -> _ ->
Msg = emqx_message:make(From, Qos, Topic, Payload), Msg = emqx_message:make(From, Qos, Topic, Payload),
NMsg = emqx_mountpoint:mount(Mountpoint, Msg), NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
emqx:publish(NMsg), _ = emqx:publish(NMsg),
{reply, ok, Channel} {reply, ok, Channel}
end; end;

View File

@ -52,7 +52,7 @@
-record(state, { -record(state, {
%% TCP/SSL/UDP/DTLS Wrapped Socket %% TCP/SSL/UDP/DTLS Wrapped Socket
socket :: esockd:socket(), socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
%% Peername of the connection %% Peername of the connection
peername :: emqx_types:peername(), peername :: emqx_types:peername(),
%% Sockname of the connection %% Sockname of the connection
@ -451,8 +451,8 @@ handle_msg(Msg, State) ->
terminate(Reason, State = #state{channel = Channel}) -> terminate(Reason, State = #state{channel = Channel}) ->
?LOG(debug, "Terminated due to ~p", [Reason]), ?LOG(debug, "Terminated due to ~p", [Reason]),
emqx_exproto_channel:terminate(Reason, Channel), _ = emqx_exproto_channel:terminate(Reason, Channel),
close_socket(State), _ = close_socket(State),
exit(Reason). exit(Reason).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -527,10 +527,10 @@ handle_timeout(TRef, Msg, State) ->
process_incoming(Data, State = #state{idle_timer = IdleTimer}) -> process_incoming(Data, State = #state{idle_timer = IdleTimer}) ->
?LOG(debug, "RECV ~0p", [Data]), ?LOG(debug, "RECV ~0p", [Data]),
Oct = iolist_size(Data), Oct = iolist_size(Data),
emqx_pd:inc_counter(incoming_bytes, Oct), inc_counter(incoming_bytes, Oct),
emqx_pd:inc_counter(incoming_pkt, 1), inc_counter(incoming_pkt, 1),
emqx_pd:inc_counter(recv_pkt, 1), inc_counter(recv_pkt, 1),
emqx_pd:inc_counter(recv_msg, 1), inc_counter(recv_msg, 1),
% TODO: % TODO:
%ok = emqx_metrics:inc('bytes.received', Oct), %ok = emqx_metrics:inc('bytes.received', Oct),
@ -561,10 +561,10 @@ handle_outgoing(IoData, State = #state{socket = Socket}) ->
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
emqx_pd:inc_counter(send_pkt, 1), inc_counter(send_pkt, 1),
emqx_pd:inc_counter(send_msg, 1), inc_counter(send_msg, 1),
emqx_pd:inc_counter(outgoing_pkt, 1), inc_counter(outgoing_pkt, 1),
emqx_pd:inc_counter(outgoing_bytes, Oct), inc_counter(outgoing_bytes, Oct),
%% FIXME: %% FIXME:
%%ok = emqx_metrics:inc('bytes.sent', Oct), %%ok = emqx_metrics:inc('bytes.sent', Oct),
@ -680,3 +680,7 @@ stop(Reason, State) ->
stop(Reason, Reply, State) -> stop(Reason, Reply, State) ->
{stop, Reason, Reply, State}. {stop, Reason, Reply, State}.
inc_counter(Name, Value) ->
_ = emqx_pd:inc_counter(Name, Value),
ok.

View File

@ -74,7 +74,8 @@ handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast({rpc, Fun, Req, Options, From}, State) -> handle_cast({rpc, Fun, Req, Options, From}, State) ->
case catch apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of try
case apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of
{ok, Resp, _Metadata} -> {ok, Resp, _Metadata} ->
?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]), ?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]),
reply(From, Fun, {ok, Resp}); reply(From, Fun, {ok, Resp});
@ -85,11 +86,12 @@ handle_cast({rpc, Fun, Req, Options, From}, State) ->
{error, Reason} -> {error, Reason} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
[?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]), [?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]),
reply(From, Fun, {error, Reason});
{'EXIT', {Reason, Stk}} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
[?CONN_ADAPTER_MOD, Fun, Req, Options, Reason, Stk]),
reply(From, Fun, {error, Reason}) reply(From, Fun, {error, Reason})
end
catch _ : Rsn : Stk ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
[?CONN_ADAPTER_MOD, Fun, Req, Options, Rsn, Stk]),
reply(From, Fun, {error, Rsn})
end, end,
{noreply, State}. {noreply, State}.
@ -107,4 +109,5 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
reply(Pid, Fun, Result) -> reply(Pid, Fun, Result) ->
Pid ! {hreply, Fun, Result}. Pid ! {hreply, Fun, Result},
ok.

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel,stdlib,emqx]}, {applications, [kernel,stdlib]},
{mod, {emqx_lua_hook_app,[]}}, {mod, {emqx_lua_hook_app,[]}},
{env,[]}, {env,[]},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -147,7 +147,7 @@ do_load(FileName) ->
error; error;
{Ret1, St1} -> {Ret1, St1} ->
?LOG(debug, "Register lua script ~p", [FileName]), ?LOG(debug, "Register lua script ~p", [FileName]),
do_register_hooks(Ret1, FileName, St1), _ = do_register_hooks(Ret1, FileName, St1),
{FileName, St1}; {FileName, St1};
Other -> Other ->
?LOG(error, "Failed to load lua script ~p, register_hook() raise exception ~p", [FileName, Other]), ?LOG(error, "Failed to load lua script ~p, register_hook() raise exception ~p", [FileName, Other]),
@ -184,7 +184,7 @@ do_register(Hook, ScriptName, _St) ->
do_register_hooks([], _ScriptName, _St) -> do_register_hooks([], _ScriptName, _St) ->
ok; ok;
do_register_hooks([H|T], ScriptName, St) -> do_register_hooks([H|T], ScriptName, St) ->
do_register(H, ScriptName, St), _ = do_register(H, ScriptName, St),
do_register_hooks(T, ScriptName, St); do_register_hooks(T, ScriptName, St);
do_register_hooks(Hook = <<$o, $n, _Rest/binary>>, ScriptName, St) -> do_register_hooks(Hook = <<$o, $n, _Rest/binary>>, ScriptName, St) ->
do_register(Hook, ScriptName, St); do_register(Hook, ScriptName, St);

View File

@ -1,5 +1,5 @@
{deps, {deps,
[{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.1"}}} [{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.2"}}}
]}. ]}.
{profiles, {profiles,
@ -25,3 +25,4 @@
{cover_enabled, true}. {cover_enabled, true}.
{cover_opts, [verbose]}. {cover_opts, [verbose]}.
{cover_export_enabled, true}. {cover_export_enabled, true}.
{extra_src_dirs, [{"lwm2m_xml", [{recursive,true}]}]}.

View File

@ -3,5 +3,5 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules,[]}, {modules,[]},
{registered,[emqx_lwm2m_sup]}, {registered,[emqx_lwm2m_sup]},
{applications,[kernel,stdlib,lwm2m_coap,emqx]}, {applications,[kernel,stdlib,lwm2m_coap]},
{mod,{emqx_lwm2m_app,[]}}]}. {mod,{emqx_lwm2m_app,[]}}]}.

View File

@ -30,7 +30,7 @@
start(_Type, _Args) -> start(_Type, _Args) ->
Pid = emqx_lwm2m_sup:start_link(), Pid = emqx_lwm2m_sup:start_link(),
lwm2m_coap_server:start_registry(), _ = lwm2m_coap_server:start_registry(),
lwm2m_coap_server_registry:add_handler([<<"rd">>], emqx_lwm2m_coap_resource, undefined), lwm2m_coap_server_registry:add_handler([<<"rd">>], emqx_lwm2m_coap_resource, undefined),
emqx_lwm2m_coap_server:start(application:get_all_env(?APP)), emqx_lwm2m_coap_server:start(application:get_all_env(?APP)),
Pid. Pid.

View File

@ -240,10 +240,10 @@ insert_resource_into_object_instance([ResourceId, ResourceInstanceId], Value, Ac
?LOG(debug, "insert_resource_into_object_instance1() ResourceId=~p, ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceId, ResourceInstanceId, Value, Acc]), ?LOG(debug, "insert_resource_into_object_instance1() ResourceId=~p, ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceId, ResourceInstanceId, Value, Acc]),
case find_resource(ResourceId, Acc) of case find_resource(ResourceId, Acc) of
undefined -> undefined ->
NewList = insert_resource_instance_into_resource(ResourceInstanceId, Value, []), NewList = insert_resource_instance_into_resource([ResourceInstanceId], Value, []),
Acc++[#{tlv_multiple_resource=>integer(ResourceId), value=>NewList}]; Acc++[#{tlv_multiple_resource=>integer(ResourceId), value=>NewList}];
Resource = #{value:=List}-> Resource = #{value:=List}->
NewList = insert_resource_instance_into_resource(ResourceInstanceId, Value, List), NewList = insert_resource_instance_into_resource([ResourceInstanceId], Value, List),
Acc2 = lists:delete(Resource, Acc), Acc2 = lists:delete(Resource, Acc),
Acc2 ++ [Resource#{value=>NewList}] Acc2 ++ [Resource#{value=>NewList}]
end; end;
@ -251,18 +251,18 @@ insert_resource_into_object_instance([ResourceId], Value, Acc) ->
?LOG(debug, "insert_resource_into_object_instance2() ResourceId=~p, Value=~p, Acc=~p", [ResourceId, Value, Acc]), ?LOG(debug, "insert_resource_into_object_instance2() ResourceId=~p, Value=~p, Acc=~p", [ResourceId, Value, Acc]),
NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value}, NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value},
case find_resource(ResourceId, Acc) of case find_resource(ResourceId, Acc) of
undeinfed -> undefined ->
Acc ++ [NewMap]; Acc ++ [NewMap];
Resource -> Resource ->
Acc2 = lists:delete(Resource, Acc), Acc2 = lists:delete(Resource, Acc),
Acc2 ++ [NewMap] Acc2 ++ [NewMap]
end. end.
insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) -> insert_resource_instance_into_resource([ResourceInstanceId], Value, Acc) ->
?LOG(debug, "insert_resource_instance_into_resource() ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceInstanceId, Value, Acc]), ?LOG(debug, "insert_resource_instance_into_resource() ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceInstanceId, Value, Acc]),
NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value}, NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value},
case find_resource_instance(ResourceInstanceId, Acc) of case find_resource_instance(ResourceInstanceId, Acc) of
undeinfed -> undefined ->
Acc ++ [NewMap]; Acc ++ [NewMap];
Resource -> Resource ->
Acc2 = lists:delete(Resource, Acc), Acc2 = lists:delete(Resource, Acc),

View File

@ -229,7 +229,7 @@ insert_resource_into_object_instance([ResourceId, ResourceInstanceId], Value, Ac
insert_resource_into_object_instance([ResourceId], Value, Acc) -> insert_resource_into_object_instance([ResourceId], Value, Acc) ->
NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value}, NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value},
case find_resource(ResourceId, Acc) of case find_resource(ResourceId, Acc) of
undeinfed -> undefined ->
Acc ++ [NewMap]; Acc ++ [NewMap];
Resource -> Resource ->
Acc2 = lists:delete(Resource, Acc), Acc2 = lists:delete(Resource, Acc),
@ -239,7 +239,7 @@ insert_resource_into_object_instance([ResourceId], Value, Acc) ->
insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) -> insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) ->
NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value}, NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value},
case find_resource_instance(ResourceInstanceId, Acc) of case find_resource_instance(ResourceInstanceId, Acc) of
undeinfed -> undefined ->
Acc ++ [NewMap]; Acc ++ [NewMap];
Resource -> Resource ->
Acc2 = lists:delete(Resource, Acc), Acc2 = lists:delete(Resource, Acc),

View File

@ -109,7 +109,7 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
Topic = downlink_topic(<<"register">>, Lwm2mState), Topic = downlink_topic(<<"register">>, Lwm2mState),
subscribe(Topic, Lwm2mState), subscribe(Topic, Lwm2mState),
%% - report the registration info %% - report the registration info
send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState), _ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
Lwm2mState#lwm2m_state{mqtt_topic = Topic}. Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo, update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
@ -124,7 +124,7 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi
end, end,
%% - flush cached donwlink commands %% - flush cached donwlink commands
flush_cached_downlink_messages(CoapPid), _ = flush_cached_downlink_messages(CoapPid),
%% - update the life timer %% - update the life timer
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer( UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
@ -136,16 +136,16 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi
replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
coap_pid = CoapPid}) -> coap_pid = CoapPid}) ->
send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState), _ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
%% - flush cached donwlink commands %% - flush cached donwlink commands
flush_cached_downlink_messages(CoapPid), _ = flush_cached_downlink_messages(CoapPid),
%% - update the life timer %% - update the life timer
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer( UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
maps:get(<<"lt">>, NewRegInfo), LifeTimer), maps:get(<<"lt">>, NewRegInfo), LifeTimer),
send_auto_observe(CoapPid, NewRegInfo), _ = send_auto_observe(CoapPid, NewRegInfo),
?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]), ?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer, Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
@ -153,13 +153,13 @@ replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
send_ul_data(_EventType, <<>>, _Lwm2mState) -> ok; send_ul_data(_EventType, <<>>, _Lwm2mState) -> ok;
send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) -> send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) ->
send_to_broker(EventType, Payload, Lwm2mState), _ = send_to_broker(EventType, Payload, Lwm2mState),
flush_cached_downlink_messages(CoapPid), _ = flush_cached_downlink_messages(CoapPid),
Lwm2mState. Lwm2mState.
auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo, auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
coap_pid = CoapPid}) -> coap_pid = CoapPid}) ->
send_auto_observe(CoapPid, RegInfo), _ = send_auto_observe(CoapPid, RegInfo),
Lwm2mState. Lwm2mState.
deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) -> deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) ->
@ -297,7 +297,7 @@ observe_object(AlternatePath, ObjectPath, CoapPid) ->
do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) -> do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
erlang:spawn(fun() -> erlang:spawn(fun() ->
lists:foreach(fun({CoapRequest, Ref}) -> lists:foreach(fun({CoapRequest, Ref}) ->
do_deliver_to_coap(CoapPid, CoapRequest, Ref), _ = do_deliver_to_coap(CoapPid, CoapRequest, Ref),
timer:sleep(Interval) timer:sleep(Interval)
end, lists:reverse(CoapRequestList)) end, lists:reverse(CoapRequestList))
end). end).

View File

@ -33,7 +33,7 @@
logger:Level("LWM2M-TIMER: " ++ Format, Args)). logger:Level("LWM2M-TIMER: " ++ Format, Args)).
cancel_timer(#timer_state{tref = TRef}) when is_reference(TRef) -> cancel_timer(#timer_state{tref = TRef}) when is_reference(TRef) ->
erlang:cancel_timer(TRef), ok. _ = erlang:cancel_timer(TRef), ok.
refresh_timer(State=#timer_state{interval = Interval, message = Msg}) -> refresh_timer(State=#timer_state{interval = Interval, message = Msg}) ->
cancel_timer(State), start_timer(Interval, Msg). cancel_timer(State), start_timer(Interval, Msg).

View File

@ -20,6 +20,11 @@
, encode/1 , encode/1
]). ]).
-ifdef(TEST).
-export([binary_to_hex_string/1]).
-endif.
-include("emqx_lwm2m.hrl"). -include("emqx_lwm2m.hrl").
-define(LOG(Level, Format, Args), logger:Level("LWM2M-TLV: " ++ Format, Args)). -define(LOG(Level, Format, Args), logger:Level("LWM2M-TLV: " ++ Format, Args)).

View File

@ -86,8 +86,8 @@ stop() ->
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
init([]) -> init([]) ->
ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]), _ = ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]),
ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]), _ = ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]),
PluginsEtcDir = emqx:get_env(plugins_etc_dir), PluginsEtcDir = emqx:get_env(plugins_etc_dir),
DefBaseDir = re:replace(PluginsEtcDir, "plugins", "lwm2m_xml", [{return, list}]), DefBaseDir = re:replace(PluginsEtcDir, "plugins", "lwm2m_xml", [{return, list}]),
BaseDir = application:get_env(emqx_lwm2m, xml_dir, DefBaseDir), BaseDir = application:get_env(emqx_lwm2m, xml_dir, DefBaseDir),

View File

@ -16,6 +16,7 @@
-module(test_mqtt_broker). -module(test_mqtt_broker).
-compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-define(LOGT(Format, Args), logger:debug("TEST_BROKER: " ++ Format, Args)). -define(LOGT(Format, Args), logger:debug("TEST_BROKER: " ++ Format, Args)).
@ -28,8 +29,6 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-import(proplists, [get_value/2, get_value/3]).
start(_, <<"attacker">>, _, _, _) -> start(_, <<"attacker">>, _, _, _) ->
{stop, auth_failure}; {stop, auth_failure};
start(ClientId, Username, Password, _Channel, KeepaliveInterval) -> start(ClientId, Username, Password, _Channel, KeepaliveInterval) ->
@ -51,7 +50,7 @@ unsubscribe(Topic) ->
gen_server:call(?MODULE, {unsubscribe, Topic}). gen_server:call(?MODULE, {unsubscribe, Topic}).
get_subscrbied_topics() -> get_subscrbied_topics() ->
[Topic || {Client, Topic} <- ets:tab2list(emqx_subscription)]. [Topic || {_Client, Topic} <- ets:tab2list(emqx_subscription)].
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

View File

@ -3,7 +3,7 @@
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.0"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_management_sup]}, {registered, [emqx_management_sup]},
{applications, [kernel,stdlib,minirest,emqx]}, {applications, [kernel,stdlib,minirest]},
{mod, {emqx_mgmt_app,[]}}, {mod, {emqx_mgmt_app,[]}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -817,7 +817,7 @@ to_version(Version) when is_list(Version) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
enable_telemetry() -> enable_telemetry() ->
[enable_telemetry(Node) || Node <- ekka_mnesia:running_nodes()], ok. lists:foreach(fun enable_telemetry/1,ekka_mnesia:running_nodes()).
enable_telemetry(Node) when Node =:= node() -> enable_telemetry(Node) when Node =:= node() ->
emqx_telemetry:enable(); emqx_telemetry:enable();
@ -825,7 +825,7 @@ enable_telemetry(Node) ->
rpc_call(Node, enable_telemetry, [Node]). rpc_call(Node, enable_telemetry, [Node]).
disable_telemetry() -> disable_telemetry() ->
[disable_telemetry(Node) || Node <- ekka_mnesia:running_nodes()], ok. lists:foreach(fun disable_telemetry/1,ekka_mnesia:running_nodes()).
disable_telemetry(Node) when Node =:= node() -> disable_telemetry(Node) when Node =:= node() ->
emqx_telemetry:disable(); emqx_telemetry:disable();

View File

@ -37,7 +37,9 @@ paginate(Tables, Params, RowFun) ->
Limit = limit(Params), Limit = limit(Params),
Cursor = qlc:cursor(Qh), Cursor = qlc:cursor(Qh),
case Page > 1 of case Page > 1 of
true -> qlc:next_answers(Cursor, (Page - 1) * Limit); true ->
_ = qlc:next_answers(Cursor, (Page - 1) * Limit),
ok;
false -> ok false -> ok
end, end,
Rows = qlc:next_answers(Cursor, Limit), Rows = qlc:next_answers(Cursor, Limit),

View File

@ -111,7 +111,7 @@ deactivate(_Bindings, Params) ->
do_deactivate(Node, Name). do_deactivate(Node, Name).
delete_deactivated(Bindings, _Params) when map_size(Bindings) == 0 -> delete_deactivated(Bindings, _Params) when map_size(Bindings) == 0 ->
emqx_mgmt:delete_all_deactivated_alarms(), _ = emqx_mgmt:delete_all_deactivated_alarms(),
{ok, #{code => ?SUCCESS}}; {ok, #{code => ?SUCCESS}};
delete_deactivated(#{node := Node}, _Params) -> delete_deactivated(#{node := Node}, _Params) ->

View File

@ -187,11 +187,11 @@ do_import(Filename) ->
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])), emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])), emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
emqx_mgmt:import_modules(maps:get(<<"modules">>, Data, [])), emqx_mgmt:import_modules(maps:get(<<"modules">>, Data, [])),
emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), _ = emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])), _ = emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), _ = emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), _ = emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])), _ = emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])),
logger:debug("The emqx data has been imported successfully"), logger:debug("The emqx data has been imported successfully"),
ok ok
catch Class:Reason:Stack -> catch Class:Reason:Stack ->

View File

@ -26,7 +26,7 @@
start(_Type, _Args) -> start(_Type, _Args) ->
{ok, Sup} = emqx_mgmt_sup:start_link(), {ok, Sup} = emqx_mgmt_sup:start_link(),
emqx_mgmt_auth:add_default_app(), _ = emqx_mgmt_auth:add_default_app(),
emqx_mgmt_http:start_listeners(), emqx_mgmt_http:start_listeners(),
emqx_mgmt_cli:load(), emqx_mgmt_cli:load(),
{ok, Sup}. {ok, Sup}.

View File

@ -416,7 +416,7 @@ log(["primary-level"]) ->
emqx_ctl:print("~s~n", [Level]); emqx_ctl:print("~s~n", [Level]);
log(["primary-level", Level]) -> log(["primary-level", Level]) ->
emqx_logger:set_primary_log_level(list_to_atom(Level)), _ = emqx_logger:set_primary_log_level(list_to_atom(Level)),
emqx_ctl:print("~s~n", [emqx_logger:get_primary_log_level()]); emqx_ctl:print("~s~n", [emqx_logger:get_primary_log_level()]);
log(["handlers", "list"]) -> log(["handlers", "list"]) ->
@ -606,11 +606,11 @@ data(["import", Filename]) ->
emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])), emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])),
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])), emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])), emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), _ = emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])), _ = emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, [])), _ = emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, [])),
emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, [])), _ = emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, [])),
emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])), _ = emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])),
emqx_ctl:print("The emqx data has been imported successfully.~n") emqx_ctl:print("The emqx data has been imported successfully.~n")
catch Class:Reason:Stack -> catch Class:Reason:Stack ->
emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}]) emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}])

Some files were not shown because too many files have changed in this diff Show More