Merge pull request #3814 from zmstone/umbrella-fix-build

Umbrella fix build
This commit is contained in:
Zaiming Shi 2020-12-11 09:05:27 +01:00 committed by GitHub
commit c8d949bc0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
94 changed files with 547 additions and 427 deletions

View File

@ -7,10 +7,10 @@ jobs:
run_test_case: run_test_case:
runs-on: ubuntu-latest runs-on: ubuntu-latest
container: container:
image: erlang:22.1 image: erlang:22.1
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: Code dialyzer - name: Code dialyzer

View File

@ -2,7 +2,7 @@ REBAR_VERSION = 3.14.3-emqx-2
REBAR = ./rebar3 REBAR = ./rebar3
PROFILE ?= emqx PROFILE ?= emqx
PROFILES := emqx emqx-edge PROFILES := emqx emqx-edge check test
PKG_PROFILES := emqx-pkg emqx-edge-pkg PKG_PROFILES := emqx-pkg emqx-edge-pkg
export REBAR_GIT_CLONE_OPTIONS += --depth=1 export REBAR_GIT_CLONE_OPTIONS += --depth=1
@ -38,14 +38,10 @@ $(PROFILES:%=build-%): $(REBAR)
# rebar clean # rebar clean
.PHONY: clean $(PROFILES:%=clean-%) .PHONY: clean $(PROFILES:%=clean-%)
clean: $(PROFILES:%=clean-%) clean-stamps clean: $(PROFILES:%=clean-%)
$(PROFILES:%=clean-%): $(REBAR) $(PROFILES:%=clean-%): $(REBAR)
$(REBAR) as $(@:clean-%=%) clean $(REBAR) as $(@:clean-%=%) clean
.PHONY: clean-stamps
clean-stamps:
find -L _build -name '.stamp' -type f | xargs rm -f
.PHONY: deps-all .PHONY: deps-all
deps-all: $(REBAR) $(PROFILES:%=deps-%) $(PKG_PROFILES:%=deps-%) deps-all: $(REBAR) $(PROFILES:%=deps-%) $(PKG_PROFILES:%=deps-%)

View File

@ -29,6 +29,10 @@
, feedvar/2 , feedvar/2
]). ]).
-type http_request() :: #http_request{method::'get' | 'post',params::[any()]}.
%-type http_opts() :: #{clientid:=_, peerhost:=_, protocol:=_, _=>_}.
%-type retry_opts() :: #{backoff:=_, interval:=_, times:=_, _=>_}.
%% Callbacks %% Callbacks
-export([ register_metrics/0 -export([ register_metrics/0
, check/3 , check/3
@ -80,7 +84,7 @@ authenticate(PoolName, #http_request{path = Path,
request_timeout = RequestTimeout}, ClientInfo) -> request_timeout = RequestTimeout}, ClientInfo) ->
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), RequestTimeout). request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), RequestTimeout).
-spec(is_superuser(atom(), maybe(#http_request{}), emqx_types:client()) -> boolean()). -spec(is_superuser(atom(), maybe(http_request()), emqx_types:client()) -> boolean()).
is_superuser(_PoolName, undefined, _ClientInfo) -> is_superuser(_PoolName, undefined, _ClientInfo) ->
false; false;
is_superuser(PoolName, #http_request{path = Path, is_superuser(PoolName, #http_request{path = Path,

View File

@ -36,8 +36,8 @@ start(_StartType, _StartArgs) ->
ok -> ok ->
{ok, PoolOpts} = application:get_env(?APP, pool_opts), {ok, PoolOpts} = application:get_env(?APP, pool_opts),
{ok, Sup} = emqx_http_client_sup:start_link(?APP, ssl(inet(PoolOpts))), {ok, Sup} = emqx_http_client_sup:start_link(?APP, ssl(inet(PoolOpts))),
with_env(auth_req, fun load_auth_hook/1), _ = with_env(auth_req, fun load_auth_hook/1),
with_env(acl_req, fun load_acl_hook/1), _ = with_env(acl_req, fun load_acl_hook/1),
{ok, Sup}; {ok, Sup};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
@ -157,4 +157,4 @@ get_addr(Hostname) ->
Addr; Addr;
{ok, Addr} -> Addr {ok, Addr} -> Addr
end end
end. end.

View File

@ -33,11 +33,10 @@ start(_Type, _Args) ->
{ok, Pid} = start_auth_server(jwks_svr_options()), {ok, Pid} = start_auth_server(jwks_svr_options()),
ok = emqx_auth_jwt:register_metrics(), ok = emqx_auth_jwt:register_metrics(),
AuthEnv0 = auth_env(), AuthEnv0 = auth_env(),
AuthEnv1 = AuthEnv0#{pid => Pid}, AuthEnv1 = AuthEnv0#{pid => Pid},
emqx:hook('client.authenticate', {emqx_auth_jwt, check, [AuthEnv1]}), _ = emqx:hook('client.authenticate', {emqx_auth_jwt, check, [AuthEnv1]}),
{ok, Sup, AuthEnv1}. {ok, Sup, AuthEnv1}.
stop(AuthEnv) -> stop(AuthEnv) ->

View File

@ -176,7 +176,7 @@ reset_timer(State = #state{intv = Intv}) ->
cancel_timer(State = #state{tref = undefined}) -> cancel_timer(State = #state{tref = undefined}) ->
State; State;
cancel_timer(State = #state{tref = TRef}) -> cancel_timer(State = #state{tref = TRef}) ->
erlang:cancel_timer(TRef), _ = erlang:cancel_timer(TRef),
State#state{tref = undefined}. State#state{tref = undefined}.
do_verify(_JwsCompated, []) -> do_verify(_JwsCompated, []) ->

View File

@ -30,11 +30,11 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_auth_ldap_sup:start_link(), {ok, Sup} = emqx_auth_ldap_sup:start_link(),
if_enabled([device_dn, match_objectclass, _ = if_enabled([device_dn, match_objectclass,
username_attr, password_attr, username_attr, password_attr,
filters, custom_base_dn, bind_as_user], filters, custom_base_dn, bind_as_user],
fun load_auth_hook/1), fun load_auth_hook/1),
if_enabled([device_dn, match_objectclass, _ = if_enabled([device_dn, match_objectclass,
username_attr, password_attr, username_attr, password_attr,
filters, custom_base_dn, bind_as_user], filters, custom_base_dn, bind_as_user],
fun load_acl_hook/1), fun load_acl_hook/1),
@ -60,8 +60,7 @@ load_acl_hook(DeviceDn) ->
if_enabled(Cfgs, Fun) -> if_enabled(Cfgs, Fun) ->
case get_env(Cfgs) of case get_env(Cfgs) of
{ok, InitArgs} -> Fun(InitArgs); {ok, InitArgs} -> Fun(InitArgs)
[] -> ok
end. end.
get_env(Cfgs) -> get_env(Cfgs) ->

View File

@ -151,7 +151,7 @@ do_add(Params) ->
Action = urldecode(get_value(<<"action">>, Params)), Action = urldecode(get_value(<<"action">>, Params)),
Access = urldecode(get_value(<<"access">>, Params)), Access = urldecode(get_value(<<"access">>, Params)),
Re = case validate([login, topic, action, access], [Login, Topic, Action, Access]) of Re = case validate([login, topic, action, access], [Login, Topic, Action, Access]) of
ok -> ok ->
emqx_acl_mnesia_cli:add_acl(Login, Topic, erlang:binary_to_atom(Action, utf8), erlang:binary_to_atom(Access, utf8)); emqx_acl_mnesia_cli:add_acl(Login, Topic, erlang:binary_to_atom(Action, utf8), erlang:binary_to_atom(Access, utf8));
Err -> Err Err -> Err
end, end,
@ -163,7 +163,7 @@ do_add(Params) ->
all -> #{all => '$all'}; all -> #{all => '$all'};
_ -> maps:from_list([Login]) _ -> maps:from_list([Login])
end). end).
delete(#{clientid := Clientid, topic := Topic}, _) -> delete(#{clientid := Clientid, topic := Topic}, _) ->
return(emqx_acl_mnesia_cli:remove_acl({clientid, urldecode(Clientid)}, urldecode(Topic))); return(emqx_acl_mnesia_cli:remove_acl({clientid, urldecode(Clientid)}, urldecode(Topic)));
delete(#{username := Username, topic := Topic}, _) -> delete(#{username := Username, topic := Topic}, _) ->
@ -202,12 +202,6 @@ do_validation(login, {clientid, V}) when is_binary(V)
do_validation(login, {username, V}) when is_binary(V) do_validation(login, {username, V}) when is_binary(V)
andalso byte_size(V) > 0-> andalso byte_size(V) > 0->
true; true;
do_validation(clientid, V) when is_binary(V)
andalso byte_size(V) > 0 ->
true;
do_validation(username, V) when is_binary(V)
andalso byte_size(V) > 0 ->
true;
do_validation(topic, V) when is_binary(V) do_validation(topic, V) when is_binary(V)
andalso byte_size(V) > 0 -> andalso byte_size(V) > 0 ->
true; true;

View File

@ -37,9 +37,9 @@ init(#{clientid_list := ClientidList, username_list := UsernameList}) ->
{disc_copies, [node()]}, {disc_copies, [node()]},
{attributes, record_info(fields, emqx_user)}, {attributes, record_info(fields, emqx_user)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]), {storage_properties, [{ets, [{read_concurrency, true}]}]}]),
[ add_default_user({{clientid, iolist_to_binary(Clientid)}, iolist_to_binary(Password)}) _ = [ add_default_user({{clientid, iolist_to_binary(Clientid)}, iolist_to_binary(Password)})
|| {Clientid, Password} <- ClientidList], || {Clientid, Password} <- ClientidList],
[ add_default_user({{username, iolist_to_binary(Username)}, iolist_to_binary(Password)}) _ = [ add_default_user({{username, iolist_to_binary(Username)}, iolist_to_binary(Password)})
|| {Username, Password} <- UsernameList], || {Username, Password} <- UsernameList],
ok = ekka_mnesia:copy_table(emqx_user, disc_copies). ok = ekka_mnesia:copy_table(emqx_user, disc_copies).
@ -59,7 +59,7 @@ check(ClientInfo = #{ clientid := Clientid
({?TABLE, {username, X }, Password, InterTime}) when X =:= Username andalso X =/= undefined -> Password ({?TABLE, {username, X }, Password, InterTime}) when X =:= Username andalso X =/= undefined -> Password
end), end),
case ets:select(?TABLE, MatchSpec) of case ets:select(?TABLE, MatchSpec) of
[] -> [] ->
emqx_metrics:inc(?AUTH_METRICS(ignore)), emqx_metrics:inc(?AUTH_METRICS(ignore)),
ok; ok;
List -> List ->

View File

@ -145,7 +145,7 @@ do_add_clientid(Params) ->
Password = urldecode(get_value(<<"password">>, Params)), Password = urldecode(get_value(<<"password">>, Params)),
Login = {clientid, Clientid}, Login = {clientid, Clientid},
case validate([login, password], [Login, Password]) of case validate([login, password], [Login, Password]) of
ok -> ok ->
emqx_auth_mnesia_cli:add_user(Login, Password); emqx_auth_mnesia_cli:add_user(Login, Password);
Err -> Err Err -> Err
end. end.
@ -221,7 +221,9 @@ paginate(Tables, MatchSpec, Params, ComparingFun, RowFun) ->
Limit = limit(Params), Limit = limit(Params),
Cursor = qlc:cursor(Qh), Cursor = qlc:cursor(Qh),
case Page > 1 of case Page > 1 of
true -> qlc:next_answers(Cursor, (Page - 1) * Limit); true ->
_ = qlc:next_answers(Cursor, (Page - 1) * Limit),
ok;
false -> ok false -> ok
end, end,
Rows = qlc:next_answers(Cursor, Limit), Rows = qlc:next_answers(Cursor, Limit),
@ -263,14 +265,6 @@ limit(Params) ->
%% Interval Funcs %% Interval Funcs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
format({?TABLE, {clientid, ClientId}, Password, _InterTime}) ->
#{clientid => ClientId,
password => Password};
format({?TABLE, {username, Username}, Password, _InterTime}) ->
#{username => Username,
password => Password};
format([{?TABLE, {clientid, ClientId}, Password, _InterTime}]) -> format([{?TABLE, {clientid, ClientId}, Password, _InterTime}]) ->
#{clientid => ClientId, #{clientid => ClientId,
password => Password}; password => Password};

View File

@ -38,8 +38,8 @@ start(_StartType, _StartArgs) ->
emqx_ctl:register_command(username, {emqx_auth_mnesia_cli, auth_username_cli}, []), emqx_ctl:register_command(username, {emqx_auth_mnesia_cli, auth_username_cli}, []),
emqx_ctl:register_command(user, {emqx_auth_mnesia_cli, auth_username_cli}, []), emqx_ctl:register_command(user, {emqx_auth_mnesia_cli, auth_username_cli}, []),
emqx_ctl:register_command(acl, {emqx_acl_mnesia_cli, cli}, []), emqx_ctl:register_command(acl, {emqx_acl_mnesia_cli, cli}, []),
load_auth_hook(), _ = load_auth_hook(),
load_acl_hook(), _ = load_acl_hook(),
{ok, Sup}. {ok, Sup}.
prep_stop(State) -> prep_stop(State) ->

View File

@ -68,13 +68,8 @@ do_update_user(User = #emqx_user{login = Login}) ->
-spec(lookup_user(tuple()) -> list()). -spec(lookup_user(tuple()) -> list()).
lookup_user(undefined) -> []; lookup_user(undefined) -> [];
lookup_user(Login) -> lookup_user(Login) ->
case mnesia:dirty_read(?TABLE, Login) of Re = mnesia:dirty_read(?TABLE, Login),
{error, Reason} -> lists:sort(fun comparing/2, Re).
?LOG(error, "[Mnesia] do_check_user error: ~p~n", [Reason]),
[];
Re ->
lists:sort(fun comparing/2, Re)
end.
%% @doc Remove user %% @doc Remove user
-spec(remove_user(tuple()) -> ok | {error, any()}). -spec(remove_user(tuple()) -> ok | {error, any()}).
@ -88,7 +83,6 @@ all_users() -> mnesia:dirty_all_keys(?TABLE).
all_users(clientid) -> all_users(clientid) ->
MatchSpec = ets:fun2ms(fun({?TABLE, {clientid, Clientid}, Password, CreatedAt}) -> {?TABLE, {clientid, Clientid}, Password, CreatedAt} end), MatchSpec = ets:fun2ms(fun({?TABLE, {clientid, Clientid}, Password, CreatedAt}) -> {?TABLE, {clientid, Clientid}, Password, CreatedAt} end),
lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec)); lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec));
all_users(username) -> all_users(username) ->
MatchSpec = ets:fun2ms(fun({?TABLE, {username, Username}, Password, CreatedAt}) -> {?TABLE, {username, Username}, Password, CreatedAt} end), MatchSpec = ets:fun2ms(fun({?TABLE, {username, Username}, Password, CreatedAt}) -> {?TABLE, {username, Username}, Password, CreatedAt} end),
lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec)). lists:sort(fun comparing/2, ets:select(?TABLE, MatchSpec)).
@ -167,7 +161,6 @@ auth_username_cli(["update", Username, NewPassword]) ->
ok -> emqx_ctl:print("ok~n"); ok -> emqx_ctl:print("ok~n");
{error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason]) {error, Reason} -> emqx_ctl:print("Error: ~p~n", [Reason])
end; end;
auth_username_cli(["del", Username]) -> auth_username_cli(["del", Username]) ->
case remove_user({username, iolist_to_binary(Username)}) of case remove_user({username, iolist_to_binary(Username)}) of
ok -> emqx_ctl:print("ok~n"); ok -> emqx_ctl:print("ok~n");

View File

@ -82,8 +82,6 @@ description() -> "Authentication with MongoDB".
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Is Superuser? %% Is Superuser?
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(is_superuser(string(), maybe(#superquery{}), emqx_types:clientinfo()) -> boolean()).
is_superuser(_Pool, undefined, _ClientInfo) -> is_superuser(_Pool, undefined, _ClientInfo) ->
false; false;
is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Selector}, ClientInfo) -> is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Selector}, ClientInfo) ->

View File

@ -36,8 +36,8 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_auth_mysql_sup:start_link(), {ok, Sup} = emqx_auth_mysql_sup:start_link(),
if_enabled(auth_query, fun load_auth_hook/1), _ = if_enabled(auth_query, fun load_auth_hook/1),
if_enabled(acl_query, fun load_acl_hook/1), _ = if_enabled(acl_query, fun load_acl_hook/1),
{ok, Sup}. {ok, Sup}.

View File

@ -29,6 +29,8 @@
, equery/3 , equery/3
]). ]).
-type client_info() :: #{username:=_, clientid:=_, peerhost:=_, _=>_}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Avoid SQL Injection: Parse SQL to Parameter Query. %% Avoid SQL Injection: Parse SQL to Parameter Query.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -62,9 +64,6 @@ connect(Opts) ->
{ok, C} -> {ok, C} ->
conn_post(C), conn_post(C),
{ok, C}; {ok, C};
{error, Reason = econnrefused} ->
?LOG(error, "[Postgres] Can't connect to Postgres server: Connection refused."),
{error, Reason};
{error, Reason = invalid_authorization_specification} -> {error, Reason = invalid_authorization_specification} ->
?LOG(error, "[Postgres] Can't connect to Postgres server: Invalid authorization specification."), ?LOG(error, "[Postgres] Can't connect to Postgres server: Invalid authorization specification."),
{error, Reason}; {error, Reason};
@ -104,9 +103,11 @@ conn_opts([Opt = {ssl_opts, _}|Opts], Acc) ->
conn_opts([_Opt|Opts], Acc) -> conn_opts([_Opt|Opts], Acc) ->
conn_opts(Opts, Acc). conn_opts(Opts, Acc).
-spec(equery(atom(), string() | epgsql:statement(), Parameters::[any()]) -> {ok, ColumnsDescription :: [any()], RowsValues :: [any()]} | {error, any()} ).
equery(Pool, Sql, Params) -> equery(Pool, Sql, Params) ->
ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, Params) end). ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, Params) end).
-spec(equery(atom(), string() | epgsql:statement(), Parameters::[any()], client_info()) -> {ok, ColumnsDescription :: [any()], RowsValues :: [any()]} | {error, any()} ).
equery(Pool, Sql, Params, ClientInfo) -> equery(Pool, Sql, Params, ClientInfo) ->
ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, replvar(Params, ClientInfo)) end). ecpool:with_client(Pool, fun(C) -> epgsql:prepared_query(C, Sql, replvar(Params, ClientInfo)) end).

View File

@ -28,8 +28,8 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_auth_redis_sup:start_link(), {ok, Sup} = emqx_auth_redis_sup:start_link(),
if_cmd_enabled(auth_cmd, fun load_auth_hook/1), _ = if_cmd_enabled(auth_cmd, fun load_auth_hook/1),
if_cmd_enabled(acl_cmd, fun load_acl_hook/1), _ = if_cmd_enabled(acl_cmd, fun load_acl_hook/1),
{ok, Sup}. {ok, Sup}.
stop(_State) -> stop(_State) ->

View File

@ -38,7 +38,7 @@ connect(Opts) ->
Host = case Sentinel =:= "" of Host = case Sentinel =:= "" of
true -> get_value(host, Opts); true -> get_value(host, Opts);
false -> false ->
eredis_sentinel:start_link(get_value(servers, Opts)), _ = eredis_sentinel:start_link(get_value(servers, Opts)),
"sentinel:" ++ Sentinel "sentinel:" ++ Sentinel
end, end,
case eredis:start_link(Host, case eredis:start_link(Host,

View File

@ -186,6 +186,5 @@ replvar([Key|More], Options) ->
%% ${node} => node() %% ${node} => node()
feedvar(clientid, ClientId, _) -> feedvar(clientid, ClientId, _) ->
iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node()))); iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node()))).
feedvar(_, Val, _) ->
Val.

View File

@ -56,16 +56,12 @@ cli(["forwards", Name]) ->
end, emqx_bridge_worker:get_forwards(Name)); end, emqx_bridge_worker:get_forwards(Name));
cli(["add-forward", Name, Topic]) -> cli(["add-forward", Name, Topic]) ->
case emqx_bridge_worker:ensure_forward_present(Name, iolist_to_binary(Topic)) of ok = emqx_bridge_worker:ensure_forward_present(Name, iolist_to_binary(Topic)),
ok -> emqx_ctl:print("Add-forward topic successfully.~n"); emqx_ctl:print("Add-forward topic successfully.~n");
{error, Reason} -> emqx_ctl:print("Add-forward failed reason: ~p.~n", [Reason])
end;
cli(["del-forward", Name, Topic]) -> cli(["del-forward", Name, Topic]) ->
case emqx_bridge_worker:ensure_forward_absent(Name, iolist_to_binary(Topic)) of ok = emqx_bridge_worker:ensure_forward_absent(Name, iolist_to_binary(Topic)),
ok -> emqx_ctl:print("Del-forward topic successfully.~n"); emqx_ctl:print("Del-forward topic successfully.~n");
{error, Reason} -> emqx_ctl:print("Del-forward failed reason: ~p.~n", [Reason])
end;
cli(["subscriptions", Name]) -> cli(["subscriptions", Name]) ->
foreach(fun({Topic, Qos}) -> foreach(fun({Topic, Qos}) ->
@ -79,10 +75,8 @@ cli(["add-subscription", Name, Topic, Qos]) ->
end; end;
cli(["del-subscription", Name, Topic]) -> cli(["del-subscription", Name, Topic]) ->
case emqx_bridge_worker:ensure_subscription_absent(Name, Topic) of ok = emqx_bridge_worker:ensure_subscription_absent(Name, Topic),
ok -> emqx_ctl:print("Del-subscription topic successfully.~n"); emqx_ctl:print("Del-subscription topic successfully.~n");
{error, Reason} -> emqx_ctl:print("Del-subscription failed reason: ~p.~n", [Reason])
end;
cli(_) -> cli(_) ->
emqx_ctl:usage([{"bridges list", "List bridges"}, emqx_ctl:usage([{"bridges list", "List bridges"},

View File

@ -33,6 +33,7 @@
-type ack_ref() :: emqx_bridge_worker:ack_ref(). -type ack_ref() :: emqx_bridge_worker:ack_ref().
-type batch() :: emqx_bridge_worker:batch(). -type batch() :: emqx_bridge_worker:batch().
-type node_or_tuple() :: atom() | {atom(), term()}.
-define(HEARTBEAT_INTERVAL, timer:seconds(1)). -define(HEARTBEAT_INTERVAL, timer:seconds(1)).
@ -61,7 +62,7 @@ stop(#{client_pid := Pid}) when is_pid(Pid) ->
ok. ok.
%% @doc Callback for `emqx_bridge_connect' behaviour %% @doc Callback for `emqx_bridge_connect' behaviour
-spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}. -spec send(#{address:=node_or_tuple(), _=>_}, batch()) -> {ok, ack_ref()} | {error, any()}.
send(#{address := Remote}, Batch) -> send(#{address := Remote}, Batch) ->
case ?RPC:call(Remote, ?MODULE, handle_send, [Batch]) of case ?RPC:call(Remote, ?MODULE, handle_send, [Batch]) of
ok -> ok ->

View File

@ -307,7 +307,7 @@ idle({call, From}, ensure_started, State) ->
case do_connect(State) of case do_connect(State) of
{ok, State1} -> {ok, State1} ->
{next_state, connected, State1, [{reply, From, ok}, {state_timeout, 0, connected}]}; {next_state, connected, State1, [{reply, From, ok}, {state_timeout, 0, connected}]};
{error, Reason} -> {error, Reason, _State} ->
{keep_state_and_data, [{reply, From, {error, Reason}}]} {keep_state_and_data, [{reply, From, {error, Reason}}]}
end; end;
%% @doc Standing by for manual start. %% @doc Standing by for manual start.
@ -320,12 +320,8 @@ idle(state_timeout, reconnect, State) ->
connecting(State); connecting(State);
idle(info, {batch_ack, Ref}, State) -> idle(info, {batch_ack, Ref}, State) ->
case do_ack(State, Ref) of {ok, NewState} = do_ack(State, Ref),
{ok, NewState} -> {keep_state, NewState};
{keep_state, NewState};
_ ->
keep_state_and_data
end;
idle(Type, Content, State) -> idle(Type, Content, State) ->
common(idle, Type, Content, State). common(idle, Type, Content, State).
@ -359,12 +355,8 @@ connected(info, {disconnected, Conn, Reason},
keep_state_and_data keep_state_and_data
end; end;
connected(info, {batch_ack, Ref}, State) -> connected(info, {batch_ack, Ref}, State) ->
case do_ack(State, Ref) of {ok, NewState} = do_ack(State, Ref),
{ok, NewState} -> {keep_state, NewState, {next_event, internal, maybe_send}};
{keep_state, NewState, {next_event, internal, maybe_send}};
_ ->
keep_state_and_data
end;
connected(Type, Content, State) -> connected(Type, Content, State) ->
common(connected, Type, Content, State). common(connected, Type, Content, State).

View File

@ -1,6 +1,6 @@
{deps, {deps,
[ [
{gen_coap, {git, "https://github.com/emqx/gen_coap", {tag, "v0.3.0"}}} {gen_coap, {git, "https://github.com/emqx/gen_coap", {tag, "v0.3.1"}}}
]}. ]}.
{edoc_opts, [{preprocess, true}]}. {edoc_opts, [{preprocess, true}]}.

View File

@ -30,7 +30,7 @@ start(_Type, _Args) ->
{ok, Sup} = emqx_coap_sup:start_link(), {ok, Sup} = emqx_coap_sup:start_link(),
coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined), coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined),
coap_server_registry:add_handler([<<"ps">>], emqx_coap_ps_resource, undefined), coap_server_registry:add_handler([<<"ps">>], emqx_coap_ps_resource, undefined),
emqx_coap_ps_topics:start_link(), _ = emqx_coap_ps_topics:start_link(),
emqx_coap_server:start(application:get_all_env(?APP)), emqx_coap_server:start(application:get_all_env(?APP)),
{ok,Sup}. {ok,Sup}.

View File

@ -136,7 +136,7 @@ handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = Top
{reply, ok, State#state{sub_topics = NewTopics}, hibernate}; {reply, ok, State#state{sub_topics = NewTopics}, hibernate};
handle_call({publish, Topic, Payload}, _From, State) -> handle_call({publish, Topic, Payload}, _From, State) ->
chann_publish(Topic, Payload, State), _ = chann_publish(Topic, Payload, State),
{reply, ok, State}; {reply, ok, State};
handle_call(info, _From, State) -> handle_call(info, _From, State) ->
@ -233,10 +233,6 @@ do_deliver({Topic, Payload}, Subscribers) ->
%% handle PUBLISH packet from broker %% handle PUBLISH packet from broker
?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]), ?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]),
deliver_to_coap(Topic, Payload, Subscribers), deliver_to_coap(Topic, Payload, Subscribers),
ok;
do_deliver(Pkt, _Subscribers) ->
?LOG(warning, "unknown packet type to deliver, pkt=~p,", [Pkt]),
ok. ok.
deliver_to_coap(_TopicName, _Payload, []) -> deliver_to_coap(_TopicName, _Payload, []) ->

View File

@ -104,7 +104,7 @@ lookup_topic_payload(Topic) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
ets:new(?COAP_TOPIC_TABLE, [set, named_table, protected]), _ = ets:new(?COAP_TOPIC_TABLE, [set, named_table, protected]),
?LOG(debug, "Create the coap_topic table", []), ?LOG(debug, "Create the coap_topic table", []),
{ok, #state{}}. {ok, #state{}}.

View File

@ -86,8 +86,8 @@ stop() ->
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
init([]) -> init([]) ->
ets:new(?RESPONSE_TAB, [set, named_table, protected]), _ = ets:new(?RESPONSE_TAB, [set, named_table, protected]),
ets:new(?RESPONSE_REF_TAB, [set, named_table, protected]), _ = ets:new(?RESPONSE_REF_TAB, [set, named_table, protected]),
{ok, #state{}}. {ok, #state{}}.
handle_call({register_name, Name, Pid}, _From, State) -> handle_call({register_name, Name, Pid}, _From, State) ->

View File

@ -137,7 +137,7 @@ change_password_hash(Username, PasswordHash) ->
update_pwd(Username, Fun) -> update_pwd(Username, Fun) ->
Trans = fun() -> Trans = fun() ->
User = User =
case lookup_user(Username) of case lookup_user(Username) of
[Admin] -> Admin; [Admin] -> Admin;
[] -> [] ->
@ -180,7 +180,7 @@ check(Username, Password) ->
init([]) -> init([]) ->
%% Add default admin user %% Add default admin user
add_default_user(binenv(default_user_username), binenv(default_user_passwd)), _ = add_default_user(binenv(default_user_username), binenv(default_user_passwd)),
{ok, state}. {ok, state}.
handle_call(_Req, _From, State) -> handle_call(_Req, _From, State) ->
@ -210,7 +210,7 @@ md5_hash(SaltBin, Password) ->
erlang:md5(<<SaltBin/binary, Password/binary>>). erlang:md5(<<SaltBin/binary, Password/binary>>).
salt() -> salt() ->
emqx_misc:rand_seed(), _ = emqx_misc:rand_seed(),
Salt = rand:uniform(16#ffffffff), Salt = rand:uniform(16#ffffffff),
<<Salt:32>>. <<Salt:32>>.

View File

@ -64,9 +64,9 @@ disable(Name) ->
unsave(Name) unsave(Name)
end. end.
-spec disable_all() -> [term()]. -spec disable_all() -> ok.
disable_all() -> disable_all() ->
[begin disable(Name), Name end || Name <- running()]. lists:foreach(fun disable/1, running()).
%%---------------------------------------------------------- %%----------------------------------------------------------
%% Dispatch APIs %% Dispatch APIs

View File

@ -45,7 +45,7 @@ start(_StartType, _StartArgs) ->
load_all_servers(), load_all_servers(),
%% Register all hooks %% Register all hooks
load_exhooks(), _ = load_exhooks(),
%% Register CLI %% Register CLI
emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []), emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []),
@ -53,8 +53,8 @@ start(_StartType, _StartArgs) ->
prep_stop(State) -> prep_stop(State) ->
emqx_ctl:unregister_command(exhook), emqx_ctl:unregister_command(exhook),
unload_exhooks(), _ = unload_exhooks(),
unload_all_servers(), ok = unload_all_servers(),
State. State.
stop(_State) -> stop(_State) ->

View File

@ -331,7 +331,7 @@ handle_call({publish, Topic, Qos, Payload},
_ -> _ ->
Msg = emqx_message:make(From, Qos, Topic, Payload), Msg = emqx_message:make(From, Qos, Topic, Payload),
NMsg = emqx_mountpoint:mount(Mountpoint, Msg), NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
emqx:publish(NMsg), _ = emqx:publish(NMsg),
{reply, ok, Channel} {reply, ok, Channel}
end; end;

View File

@ -52,7 +52,7 @@
-record(state, { -record(state, {
%% TCP/SSL/UDP/DTLS Wrapped Socket %% TCP/SSL/UDP/DTLS Wrapped Socket
socket :: esockd:socket(), socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
%% Peername of the connection %% Peername of the connection
peername :: emqx_types:peername(), peername :: emqx_types:peername(),
%% Sockname of the connection %% Sockname of the connection
@ -451,8 +451,8 @@ handle_msg(Msg, State) ->
terminate(Reason, State = #state{channel = Channel}) -> terminate(Reason, State = #state{channel = Channel}) ->
?LOG(debug, "Terminated due to ~p", [Reason]), ?LOG(debug, "Terminated due to ~p", [Reason]),
emqx_exproto_channel:terminate(Reason, Channel), _ = emqx_exproto_channel:terminate(Reason, Channel),
close_socket(State), _ = close_socket(State),
exit(Reason). exit(Reason).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -527,10 +527,10 @@ handle_timeout(TRef, Msg, State) ->
process_incoming(Data, State = #state{idle_timer = IdleTimer}) -> process_incoming(Data, State = #state{idle_timer = IdleTimer}) ->
?LOG(debug, "RECV ~0p", [Data]), ?LOG(debug, "RECV ~0p", [Data]),
Oct = iolist_size(Data), Oct = iolist_size(Data),
emqx_pd:inc_counter(incoming_bytes, Oct), inc_counter(incoming_bytes, Oct),
emqx_pd:inc_counter(incoming_pkt, 1), inc_counter(incoming_pkt, 1),
emqx_pd:inc_counter(recv_pkt, 1), inc_counter(recv_pkt, 1),
emqx_pd:inc_counter(recv_msg, 1), inc_counter(recv_msg, 1),
% TODO: % TODO:
%ok = emqx_metrics:inc('bytes.received', Oct), %ok = emqx_metrics:inc('bytes.received', Oct),
@ -561,10 +561,10 @@ handle_outgoing(IoData, State = #state{socket = Socket}) ->
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
emqx_pd:inc_counter(send_pkt, 1), inc_counter(send_pkt, 1),
emqx_pd:inc_counter(send_msg, 1), inc_counter(send_msg, 1),
emqx_pd:inc_counter(outgoing_pkt, 1), inc_counter(outgoing_pkt, 1),
emqx_pd:inc_counter(outgoing_bytes, Oct), inc_counter(outgoing_bytes, Oct),
%% FIXME: %% FIXME:
%%ok = emqx_metrics:inc('bytes.sent', Oct), %%ok = emqx_metrics:inc('bytes.sent', Oct),
@ -680,3 +680,7 @@ stop(Reason, State) ->
stop(Reason, Reply, State) -> stop(Reason, Reply, State) ->
{stop, Reason, Reply, State}. {stop, Reason, Reply, State}.
inc_counter(Name, Value) ->
_ = emqx_pd:inc_counter(Name, Value),
ok.

View File

@ -74,22 +74,24 @@ handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast({rpc, Fun, Req, Options, From}, State) -> handle_cast({rpc, Fun, Req, Options, From}, State) ->
case catch apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of try
{ok, Resp, _Metadata} -> case apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of
?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]), {ok, Resp, _Metadata} ->
reply(From, Fun, {ok, Resp}); ?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]),
{error, {Code, Msg}, _Metadata} -> reply(From, Fun, {ok, Resp});
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", {error, {Code, Msg}, _Metadata} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
[?CONN_ADAPTER_MOD, Fun, Req, Options, Code, Msg]), [?CONN_ADAPTER_MOD, Fun, Req, Options, Code, Msg]),
reply(From, Fun, {error, {Code, Msg}}); reply(From, Fun, {error, {Code, Msg}});
{error, Reason} -> {error, Reason} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
[?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]), [?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]),
reply(From, Fun, {error, Reason}); reply(From, Fun, {error, Reason})
{'EXIT', {Reason, Stk}} -> end
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p", catch _ : Rsn : Stk ->
[?CONN_ADAPTER_MOD, Fun, Req, Options, Reason, Stk]), ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
reply(From, Fun, {error, Reason}) [?CONN_ADAPTER_MOD, Fun, Req, Options, Rsn, Stk]),
reply(From, Fun, {error, Rsn})
end, end,
{noreply, State}. {noreply, State}.
@ -107,4 +109,5 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
reply(Pid, Fun, Result) -> reply(Pid, Fun, Result) ->
Pid ! {hreply, Fun, Result}. Pid ! {hreply, Fun, Result},
ok.

View File

@ -147,7 +147,7 @@ do_load(FileName) ->
error; error;
{Ret1, St1} -> {Ret1, St1} ->
?LOG(debug, "Register lua script ~p", [FileName]), ?LOG(debug, "Register lua script ~p", [FileName]),
do_register_hooks(Ret1, FileName, St1), _ = do_register_hooks(Ret1, FileName, St1),
{FileName, St1}; {FileName, St1};
Other -> Other ->
?LOG(error, "Failed to load lua script ~p, register_hook() raise exception ~p", [FileName, Other]), ?LOG(error, "Failed to load lua script ~p, register_hook() raise exception ~p", [FileName, Other]),
@ -184,7 +184,7 @@ do_register(Hook, ScriptName, _St) ->
do_register_hooks([], _ScriptName, _St) -> do_register_hooks([], _ScriptName, _St) ->
ok; ok;
do_register_hooks([H|T], ScriptName, St) -> do_register_hooks([H|T], ScriptName, St) ->
do_register(H, ScriptName, St), _ = do_register(H, ScriptName, St),
do_register_hooks(T, ScriptName, St); do_register_hooks(T, ScriptName, St);
do_register_hooks(Hook = <<$o, $n, _Rest/binary>>, ScriptName, St) -> do_register_hooks(Hook = <<$o, $n, _Rest/binary>>, ScriptName, St) ->
do_register(Hook, ScriptName, St); do_register(Hook, ScriptName, St);

View File

@ -1,5 +1,5 @@
{deps, {deps,
[{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.1"}}} [{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v1.1.2"}}}
]}. ]}.
{profiles, {profiles,

View File

@ -30,7 +30,7 @@
start(_Type, _Args) -> start(_Type, _Args) ->
Pid = emqx_lwm2m_sup:start_link(), Pid = emqx_lwm2m_sup:start_link(),
lwm2m_coap_server:start_registry(), _ = lwm2m_coap_server:start_registry(),
lwm2m_coap_server_registry:add_handler([<<"rd">>], emqx_lwm2m_coap_resource, undefined), lwm2m_coap_server_registry:add_handler([<<"rd">>], emqx_lwm2m_coap_resource, undefined),
emqx_lwm2m_coap_server:start(application:get_all_env(?APP)), emqx_lwm2m_coap_server:start(application:get_all_env(?APP)),
Pid. Pid.

View File

@ -240,10 +240,10 @@ insert_resource_into_object_instance([ResourceId, ResourceInstanceId], Value, Ac
?LOG(debug, "insert_resource_into_object_instance1() ResourceId=~p, ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceId, ResourceInstanceId, Value, Acc]), ?LOG(debug, "insert_resource_into_object_instance1() ResourceId=~p, ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceId, ResourceInstanceId, Value, Acc]),
case find_resource(ResourceId, Acc) of case find_resource(ResourceId, Acc) of
undefined -> undefined ->
NewList = insert_resource_instance_into_resource(ResourceInstanceId, Value, []), NewList = insert_resource_instance_into_resource([ResourceInstanceId], Value, []),
Acc++[#{tlv_multiple_resource=>integer(ResourceId), value=>NewList}]; Acc++[#{tlv_multiple_resource=>integer(ResourceId), value=>NewList}];
Resource = #{value:=List}-> Resource = #{value:=List}->
NewList = insert_resource_instance_into_resource(ResourceInstanceId, Value, List), NewList = insert_resource_instance_into_resource([ResourceInstanceId], Value, List),
Acc2 = lists:delete(Resource, Acc), Acc2 = lists:delete(Resource, Acc),
Acc2 ++ [Resource#{value=>NewList}] Acc2 ++ [Resource#{value=>NewList}]
end; end;
@ -251,18 +251,18 @@ insert_resource_into_object_instance([ResourceId], Value, Acc) ->
?LOG(debug, "insert_resource_into_object_instance2() ResourceId=~p, Value=~p, Acc=~p", [ResourceId, Value, Acc]), ?LOG(debug, "insert_resource_into_object_instance2() ResourceId=~p, Value=~p, Acc=~p", [ResourceId, Value, Acc]),
NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value}, NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value},
case find_resource(ResourceId, Acc) of case find_resource(ResourceId, Acc) of
undeinfed -> undefined ->
Acc ++ [NewMap]; Acc ++ [NewMap];
Resource -> Resource ->
Acc2 = lists:delete(Resource, Acc), Acc2 = lists:delete(Resource, Acc),
Acc2 ++ [NewMap] Acc2 ++ [NewMap]
end. end.
insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) -> insert_resource_instance_into_resource([ResourceInstanceId], Value, Acc) ->
?LOG(debug, "insert_resource_instance_into_resource() ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceInstanceId, Value, Acc]), ?LOG(debug, "insert_resource_instance_into_resource() ResourceInstanceId=~p, Value=~p, Acc=~p", [ResourceInstanceId, Value, Acc]),
NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value}, NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value},
case find_resource_instance(ResourceInstanceId, Acc) of case find_resource_instance(ResourceInstanceId, Acc) of
undeinfed -> undefined ->
Acc ++ [NewMap]; Acc ++ [NewMap];
Resource -> Resource ->
Acc2 = lists:delete(Resource, Acc), Acc2 = lists:delete(Resource, Acc),

View File

@ -229,7 +229,7 @@ insert_resource_into_object_instance([ResourceId, ResourceInstanceId], Value, Ac
insert_resource_into_object_instance([ResourceId], Value, Acc) -> insert_resource_into_object_instance([ResourceId], Value, Acc) ->
NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value}, NewMap = #{tlv_resource_with_value=>integer(ResourceId), value=>Value},
case find_resource(ResourceId, Acc) of case find_resource(ResourceId, Acc) of
undeinfed -> undefined ->
Acc ++ [NewMap]; Acc ++ [NewMap];
Resource -> Resource ->
Acc2 = lists:delete(Resource, Acc), Acc2 = lists:delete(Resource, Acc),
@ -239,7 +239,7 @@ insert_resource_into_object_instance([ResourceId], Value, Acc) ->
insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) -> insert_resource_instance_into_resource(ResourceInstanceId, Value, Acc) ->
NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value}, NewMap = #{tlv_resource_instance=>integer(ResourceInstanceId), value=>Value},
case find_resource_instance(ResourceInstanceId, Acc) of case find_resource_instance(ResourceInstanceId, Acc) of
undeinfed -> undefined ->
Acc ++ [NewMap]; Acc ++ [NewMap];
Resource -> Resource ->
Acc2 = lists:delete(Resource, Acc), Acc2 = lists:delete(Resource, Acc),

View File

@ -109,7 +109,7 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
Topic = downlink_topic(<<"register">>, Lwm2mState), Topic = downlink_topic(<<"register">>, Lwm2mState),
subscribe(Topic, Lwm2mState), subscribe(Topic, Lwm2mState),
%% - report the registration info %% - report the registration info
send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState), _ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
Lwm2mState#lwm2m_state{mqtt_topic = Topic}. Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo, update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
@ -124,7 +124,7 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi
end, end,
%% - flush cached donwlink commands %% - flush cached donwlink commands
flush_cached_downlink_messages(CoapPid), _ = flush_cached_downlink_messages(CoapPid),
%% - update the life timer %% - update the life timer
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer( UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
@ -136,16 +136,16 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi
replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
coap_pid = CoapPid}) -> coap_pid = CoapPid}) ->
send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState), _ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
%% - flush cached donwlink commands %% - flush cached donwlink commands
flush_cached_downlink_messages(CoapPid), _ = flush_cached_downlink_messages(CoapPid),
%% - update the life timer %% - update the life timer
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer( UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
maps:get(<<"lt">>, NewRegInfo), LifeTimer), maps:get(<<"lt">>, NewRegInfo), LifeTimer),
send_auto_observe(CoapPid, NewRegInfo), _ = send_auto_observe(CoapPid, NewRegInfo),
?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]), ?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer, Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
@ -153,13 +153,13 @@ replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
send_ul_data(_EventType, <<>>, _Lwm2mState) -> ok; send_ul_data(_EventType, <<>>, _Lwm2mState) -> ok;
send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) -> send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) ->
send_to_broker(EventType, Payload, Lwm2mState), _ = send_to_broker(EventType, Payload, Lwm2mState),
flush_cached_downlink_messages(CoapPid), _ = flush_cached_downlink_messages(CoapPid),
Lwm2mState. Lwm2mState.
auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo, auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
coap_pid = CoapPid}) -> coap_pid = CoapPid}) ->
send_auto_observe(CoapPid, RegInfo), _ = send_auto_observe(CoapPid, RegInfo),
Lwm2mState. Lwm2mState.
deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) -> deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) ->
@ -297,7 +297,7 @@ observe_object(AlternatePath, ObjectPath, CoapPid) ->
do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) -> do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
erlang:spawn(fun() -> erlang:spawn(fun() ->
lists:foreach(fun({CoapRequest, Ref}) -> lists:foreach(fun({CoapRequest, Ref}) ->
do_deliver_to_coap(CoapPid, CoapRequest, Ref), _ = do_deliver_to_coap(CoapPid, CoapRequest, Ref),
timer:sleep(Interval) timer:sleep(Interval)
end, lists:reverse(CoapRequestList)) end, lists:reverse(CoapRequestList))
end). end).

View File

@ -33,7 +33,7 @@
logger:Level("LWM2M-TIMER: " ++ Format, Args)). logger:Level("LWM2M-TIMER: " ++ Format, Args)).
cancel_timer(#timer_state{tref = TRef}) when is_reference(TRef) -> cancel_timer(#timer_state{tref = TRef}) when is_reference(TRef) ->
erlang:cancel_timer(TRef), ok. _ = erlang:cancel_timer(TRef), ok.
refresh_timer(State=#timer_state{interval = Interval, message = Msg}) -> refresh_timer(State=#timer_state{interval = Interval, message = Msg}) ->
cancel_timer(State), start_timer(Interval, Msg). cancel_timer(State), start_timer(Interval, Msg).

View File

@ -20,6 +20,11 @@
, encode/1 , encode/1
]). ]).
-ifdef(TEST).
-export([binary_to_hex_string/1]).
-endif.
-include("emqx_lwm2m.hrl"). -include("emqx_lwm2m.hrl").
-define(LOG(Level, Format, Args), logger:Level("LWM2M-TLV: " ++ Format, Args)). -define(LOG(Level, Format, Args), logger:Level("LWM2M-TLV: " ++ Format, Args)).

View File

@ -86,8 +86,8 @@ stop() ->
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
init([]) -> init([]) ->
ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]), _ = ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]),
ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]), _ = ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]),
PluginsEtcDir = emqx:get_env(plugins_etc_dir), PluginsEtcDir = emqx:get_env(plugins_etc_dir),
DefBaseDir = re:replace(PluginsEtcDir, "plugins", "lwm2m_xml", [{return, list}]), DefBaseDir = re:replace(PluginsEtcDir, "plugins", "lwm2m_xml", [{return, list}]),
BaseDir = application:get_env(emqx_lwm2m, xml_dir, DefBaseDir), BaseDir = application:get_env(emqx_lwm2m, xml_dir, DefBaseDir),

View File

@ -585,7 +585,7 @@ delete_all_deactivated_alarms() ->
delete_all_deactivated_alarms(Node) when Node =:= node() -> delete_all_deactivated_alarms(Node) when Node =:= node() ->
emqx_alarm:delete_all_deactivated_alarms(); emqx_alarm:delete_all_deactivated_alarms();
delete_all_deactivated_alarms(Node) -> delete_all_deactivated_alarms(Node) ->
rpc_call(Node, delete_deactivated_alarms, [Node]). rpc_call(Node, delete_deactivated_alarms, [Node]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -664,7 +664,7 @@ export_auth_username() ->
export_auth_mnesia() -> export_auth_mnesia() ->
case ets:info(emqx_user) of case ets:info(emqx_user) of
undefined -> []; undefined -> [];
_ -> _ ->
lists:foldl(fun({_, Login, Password, IsSuperuser}, Acc) -> lists:foldl(fun({_, Login, Password, IsSuperuser}, Acc) ->
[[{login, Login}, {password, Password}, {is_superuser, IsSuperuser}] | Acc] [[{login, Login}, {password, Password}, {is_superuser, IsSuperuser}] | Acc]
end, [], ets:tab2list(emqx_user)) end, [], ets:tab2list(emqx_user))
@ -764,7 +764,7 @@ import_auth_clientid(Lists) ->
case ets:info(emqx_auth_clientid) of case ets:info(emqx_auth_clientid) of
undefined -> ok; undefined -> ok;
_ -> _ ->
[ mnesia:dirty_write({emqx_auth_clientid, ClientId, Password}) || #{<<"clientid">> := ClientId, [ mnesia:dirty_write({emqx_auth_clientid, ClientId, Password}) || #{<<"clientid">> := ClientId,
<<"password">> := Password} <- Lists ] <<"password">> := Password} <- Lists ]
end. end.
@ -772,14 +772,14 @@ import_auth_username(Lists) ->
case ets:info(emqx_auth_username) of case ets:info(emqx_auth_username) of
undefined -> ok; undefined -> ok;
_ -> _ ->
[ mnesia:dirty_write({emqx_auth_username, Username, Password}) || #{<<"username">> := Username, [ mnesia:dirty_write({emqx_auth_username, Username, Password}) || #{<<"username">> := Username,
<<"password">> := Password} <- Lists ] <<"password">> := Password} <- Lists ]
end. end.
import_auth_mnesia(Auths) -> import_auth_mnesia(Auths) ->
case ets:info(emqx_user) of case ets:info(emqx_user) of
undefined -> ok; undefined -> ok;
_ -> _ ->
[ mnesia:dirty_write({emqx_user, Login, Password, IsSuperuser}) || #{<<"login">> := Login, [ mnesia:dirty_write({emqx_user, Login, Password, IsSuperuser}) || #{<<"login">> := Login,
<<"password">> := Password, <<"password">> := Password,
<<"is_superuser">> := IsSuperuser} <- Auths ] <<"is_superuser">> := IsSuperuser} <- Auths ]
@ -788,14 +788,14 @@ import_auth_mnesia(Auths) ->
import_acl_mnesia(Acls) -> import_acl_mnesia(Acls) ->
case ets:info(emqx_acl) of case ets:info(emqx_acl) of
undefined -> ok; undefined -> ok;
_ -> _ ->
[ mnesia:dirty_write({emqx_acl ,Login, Topic, Action, Allow}) || #{<<"login">> := Login, [ mnesia:dirty_write({emqx_acl ,Login, Topic, Action, Allow}) || #{<<"login">> := Login,
<<"topic">> := Topic, <<"topic">> := Topic,
<<"action">> := Action, <<"action">> := Action,
<<"allow">> := Allow} <- Acls ] <<"allow">> := Allow} <- Acls ]
end. end.
import_schemas(Schemas) -> import_schemas(Schemas) ->
case ets:info(emqx_schema) of case ets:info(emqx_schema) of
undefined -> ok; undefined -> ok;
_ -> [emqx_schema_registry:add_schema(emqx_schema_api:make_schema_params(Schema)) || Schema <- Schemas] _ -> [emqx_schema_registry:add_schema(emqx_schema_api:make_schema_params(Schema)) || Schema <- Schemas]
@ -817,7 +817,7 @@ to_version(Version) when is_list(Version) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
enable_telemetry() -> enable_telemetry() ->
[enable_telemetry(Node) || Node <- ekka_mnesia:running_nodes()], ok. lists:foreach(fun enable_telemetry/1,ekka_mnesia:running_nodes()).
enable_telemetry(Node) when Node =:= node() -> enable_telemetry(Node) when Node =:= node() ->
emqx_telemetry:enable(); emqx_telemetry:enable();
@ -825,7 +825,7 @@ enable_telemetry(Node) ->
rpc_call(Node, enable_telemetry, [Node]). rpc_call(Node, enable_telemetry, [Node]).
disable_telemetry() -> disable_telemetry() ->
[disable_telemetry(Node) || Node <- ekka_mnesia:running_nodes()], ok. lists:foreach(fun disable_telemetry/1,ekka_mnesia:running_nodes()).
disable_telemetry(Node) when Node =:= node() -> disable_telemetry(Node) when Node =:= node() ->
emqx_telemetry:disable(); emqx_telemetry:disable();

View File

@ -37,7 +37,9 @@ paginate(Tables, Params, RowFun) ->
Limit = limit(Params), Limit = limit(Params),
Cursor = qlc:cursor(Qh), Cursor = qlc:cursor(Qh),
case Page > 1 of case Page > 1 of
true -> qlc:next_answers(Cursor, (Page - 1) * Limit); true ->
_ = qlc:next_answers(Cursor, (Page - 1) * Limit),
ok;
false -> ok false -> ok
end, end,
Rows = qlc:next_answers(Cursor, Limit), Rows = qlc:next_answers(Cursor, Limit),

View File

@ -111,7 +111,7 @@ deactivate(_Bindings, Params) ->
do_deactivate(Node, Name). do_deactivate(Node, Name).
delete_deactivated(Bindings, _Params) when map_size(Bindings) == 0 -> delete_deactivated(Bindings, _Params) when map_size(Bindings) == 0 ->
emqx_mgmt:delete_all_deactivated_alarms(), _ = emqx_mgmt:delete_all_deactivated_alarms(),
{ok, #{code => ?SUCCESS}}; {ok, #{code => ?SUCCESS}};
delete_deactivated(#{node := Node}, _Params) -> delete_deactivated(#{node := Node}, _Params) ->
@ -134,4 +134,4 @@ do_deactivate(Node, Name) ->
minirest:return(); minirest:return();
{error, Reason} -> {error, Reason} ->
minirest:return({error, Reason}) minirest:return({error, Reason})
end. end.

View File

@ -187,11 +187,11 @@ do_import(Filename) ->
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])), emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])), emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
emqx_mgmt:import_modules(maps:get(<<"modules">>, Data, [])), emqx_mgmt:import_modules(maps:get(<<"modules">>, Data, [])),
emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), _ = emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])), _ = emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), _ = emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), _ = emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])), _ = emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])),
logger:debug("The emqx data has been imported successfully"), logger:debug("The emqx data has been imported successfully"),
ok ok
catch Class:Reason:Stack -> catch Class:Reason:Stack ->

View File

@ -26,7 +26,7 @@
start(_Type, _Args) -> start(_Type, _Args) ->
{ok, Sup} = emqx_mgmt_sup:start_link(), {ok, Sup} = emqx_mgmt_sup:start_link(),
emqx_mgmt_auth:add_default_app(), _ = emqx_mgmt_auth:add_default_app(),
emqx_mgmt_http:start_listeners(), emqx_mgmt_http:start_listeners(),
emqx_mgmt_cli:load(), emqx_mgmt_cli:load(),
{ok, Sup}. {ok, Sup}.

View File

@ -416,7 +416,7 @@ log(["primary-level"]) ->
emqx_ctl:print("~s~n", [Level]); emqx_ctl:print("~s~n", [Level]);
log(["primary-level", Level]) -> log(["primary-level", Level]) ->
emqx_logger:set_primary_log_level(list_to_atom(Level)), _ = emqx_logger:set_primary_log_level(list_to_atom(Level)),
emqx_ctl:print("~s~n", [emqx_logger:get_primary_log_level()]); emqx_ctl:print("~s~n", [emqx_logger:get_primary_log_level()]);
log(["handlers", "list"]) -> log(["handlers", "list"]) ->
@ -606,11 +606,11 @@ data(["import", Filename]) ->
emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])), emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])),
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])), emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])), emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), _ = emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])), _ = emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, [])), _ = emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, [])),
emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, [])), _ = emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, [])),
emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])), _ = emqx_mgmt:import_schemas(maps:get(<<"schemas">>, Data, [])),
emqx_ctl:print("The emqx data has been imported successfully.~n") emqx_ctl:print("The emqx data has been imported successfully.~n")
catch Class:Reason:Stack -> catch Class:Reason:Stack ->
emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}]) emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}])

View File

@ -52,25 +52,25 @@
%% Called when the plugin application start %% Called when the plugin application start
load(Env) -> load(Env) ->
emqx:hook('client.connect', {?MODULE, on_client_connect, [Env]}), hook('client.connect', {?MODULE, on_client_connect, [Env]}),
emqx:hook('client.connack', {?MODULE, on_client_connack, [Env]}), hook('client.connack', {?MODULE, on_client_connack, [Env]}),
emqx:hook('client.connected', {?MODULE, on_client_connected, [Env]}), hook('client.connected', {?MODULE, on_client_connected, [Env]}),
emqx:hook('client.disconnected', {?MODULE, on_client_disconnected, [Env]}), hook('client.disconnected', {?MODULE, on_client_disconnected, [Env]}),
emqx:hook('client.authenticate', {?MODULE, on_client_authenticate, [Env]}), hook('client.authenticate', {?MODULE, on_client_authenticate, [Env]}),
emqx:hook('client.check_acl', {?MODULE, on_client_check_acl, [Env]}), hook('client.check_acl', {?MODULE, on_client_check_acl, [Env]}),
emqx:hook('client.subscribe', {?MODULE, on_client_subscribe, [Env]}), hook('client.subscribe', {?MODULE, on_client_subscribe, [Env]}),
emqx:hook('client.unsubscribe', {?MODULE, on_client_unsubscribe, [Env]}), hook('client.unsubscribe', {?MODULE, on_client_unsubscribe, [Env]}),
emqx:hook('session.created', {?MODULE, on_session_created, [Env]}), hook('session.created', {?MODULE, on_session_created, [Env]}),
emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Env]}), hook('session.subscribed', {?MODULE, on_session_subscribed, [Env]}),
emqx:hook('session.unsubscribed',{?MODULE, on_session_unsubscribed, [Env]}), hook('session.unsubscribed',{?MODULE, on_session_unsubscribed, [Env]}),
emqx:hook('session.resumed', {?MODULE, on_session_resumed, [Env]}), hook('session.resumed', {?MODULE, on_session_resumed, [Env]}),
emqx:hook('session.discarded', {?MODULE, on_session_discarded, [Env]}), hook('session.discarded', {?MODULE, on_session_discarded, [Env]}),
emqx:hook('session.takeovered', {?MODULE, on_session_takeovered, [Env]}), hook('session.takeovered', {?MODULE, on_session_takeovered, [Env]}),
emqx:hook('session.terminated', {?MODULE, on_session_terminated, [Env]}), hook('session.terminated', {?MODULE, on_session_terminated, [Env]}),
emqx:hook('message.publish', {?MODULE, on_message_publish, [Env]}), hook('message.publish', {?MODULE, on_message_publish, [Env]}),
emqx:hook('message.delivered', {?MODULE, on_message_delivered, [Env]}), hook('message.delivered', {?MODULE, on_message_delivered, [Env]}),
emqx:hook('message.acked', {?MODULE, on_message_acked, [Env]}), hook('message.acked', {?MODULE, on_message_acked, [Env]}),
emqx:hook('message.dropped', {?MODULE, on_message_dropped, [Env]}). hook('message.dropped', {?MODULE, on_message_dropped, [Env]}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Client Lifecircle Hooks %% Client Lifecircle Hooks
@ -186,3 +186,8 @@ unload() ->
emqx:unhook('message.acked', {?MODULE, on_message_acked}), emqx:unhook('message.acked', {?MODULE, on_message_acked}),
emqx:unhook('message.dropped', {?MODULE, on_message_dropped}). emqx:unhook('message.dropped', {?MODULE, on_message_dropped}).
hook(Name, MFA) ->
case emqx:hook(Name, MFA) of
ok -> ok;
{error, already_exists} -> ok
end.

View File

@ -34,10 +34,10 @@
%% Called when the plugin application start %% Called when the plugin application start
load(Env) -> load(Env) ->
ets:new(?TAB, [set, named_table, {keypos, #psk_entry.psk_id}]), _ = ets:new(?TAB, [set, named_table, {keypos, #psk_entry.psk_id}]),
{ok, PskFile} = file:open(get_value(path, Env), [read, raw, binary, read_ahead]), {ok, PskFile} = file:open(get_value(path, Env), [read, raw, binary, read_ahead]),
preload_psks(PskFile, bin(get_value(delimiter, Env))), preload_psks(PskFile, bin(get_value(delimiter, Env))),
file:close(PskFile), _ = file:close(PskFile),
emqx:hook('tls_handshake.psk_lookup', fun ?MODULE:on_psk_lookup/2, []). emqx:hook('tls_handshake.psk_lookup', fun ?MODULE:on_psk_lookup/2, []).
%% Called when the plugin application stop %% Called when the plugin application stop

View File

@ -25,7 +25,7 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_psk_file_sup:start_link(), {ok, Sup} = emqx_psk_file_sup:start_link(),
emqx_psk_file:load( _ = emqx_psk_file:load(
application:get_all_env(emqx_psk_file)), application:get_all_env(emqx_psk_file)),
{ok, Sup}. {ok, Sup}.

View File

@ -56,8 +56,9 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
load(Env) -> load(Env) ->
emqx:hook('session.subscribed', fun ?MODULE:on_session_subscribed/3, []), _ = emqx:hook('session.subscribed', fun ?MODULE:on_session_subscribed/3, []),
emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]). _ = emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
ok.
unload() -> unload() ->
emqx:unhook('message.publish', fun ?MODULE:on_message_publish/2), emqx:unhook('message.publish', fun ?MODULE:on_message_publish/2),
@ -169,15 +170,17 @@ handle_info(stats, State = #state{stats_fun = StatsFun}) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(expire, State) -> handle_info(expire, State) ->
expire_messages(), ok = expire_messages(),
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]), ?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State = #state{stats_timer = TRef1, expiry_timer = TRef2}) -> terminate(_Reason, #state{stats_timer = TRef1, expiry_timer = TRef2} = State) ->
timer:cancel(TRef1), timer:cancel(TRef2). _ = timer:cancel(TRef1),
_ = timer:cancel(TRef2),
ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -248,11 +251,12 @@ expire_messages() ->
NowMs = erlang:system_time(millisecond), NowMs = erlang:system_time(millisecond),
MsHd = #retained{topic = '$1', msg = '_', expiry_time = '$3'}, MsHd = #retained{topic = '$1', msg = '_', expiry_time = '$3'},
Ms = [{MsHd, [{'=/=','$3',0}, {'<','$3',NowMs}], ['$1']}], Ms = [{MsHd, [{'=/=','$3',0}, {'<','$3',NowMs}], ['$1']}],
mnesia:transaction( {atomic, _} = mnesia:transaction(
fun() -> fun() ->
Keys = mnesia:select(?TAB, Ms, write), Keys = mnesia:select(?TAB, Ms, write),
lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys) lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys)
end). end),
ok.
-spec(read_messages(emqx_types:topic()) -spec(read_messages(emqx_types:topic())
-> [emqx_types:message()]). -> [emqx_types:message()]).

View File

@ -60,7 +60,7 @@
{ id :: action_instance_id() { id :: action_instance_id()
, name :: action_name() , name :: action_name()
, fallbacks :: list(#action_instance{}) , fallbacks :: list(#action_instance{})
, args :: #{atom() => term()} %% the args got from API for initializing action_instance , args :: #{binary() => term()} %% the args got from API for initializing action_instance
}). }).
-record(rule, -record(rule,
@ -82,7 +82,7 @@
{ id :: resource_id() { id :: resource_id()
, type :: resource_type_name() , type :: resource_type_name()
, config :: #{} %% the configs got from API for initializing resource , config :: #{} %% the configs got from API for initializing resource
, created_at :: erlang:timestamp() , created_at :: integer() %% epoch in millisecond precision
, description :: binary() , description :: binary()
}). }).

View File

@ -39,6 +39,7 @@
, get_resource_status/1 , get_resource_status/1
, get_resource_params/1 , get_resource_params/1
, delete_resource/1 , delete_resource/1
, ensure_resource_deleted/1
]). ]).
-export([ init_resource/4 -export([ init_resource/4
@ -154,6 +155,7 @@ module_attributes(Module) ->
%% APIs for rules and resources %% APIs for rules and resources
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-dialyzer([{nowarn_function, [create_rule/1, rule_id/0]}]).
-spec(create_rule(#{}) -> {ok, rule()} | no_return()). -spec(create_rule(#{}) -> {ok, rule()} | no_return()).
create_rule(Params = #{rawsql := Sql, actions := Actions}) -> create_rule(Params = #{rawsql := Sql, actions := Actions}) ->
case emqx_rule_sqlparser:parse_select(Sql) of case emqx_rule_sqlparser:parse_select(Sql) of
@ -178,7 +180,7 @@ create_rule(Params = #{rawsql := Sql, actions := Actions}) ->
Error -> error(Error) Error -> error(Error)
end. end.
-spec(update_rule(#{}) -> {ok, rule()} | no_return()). -spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | {error, {not_found, rule_id()}}).
update_rule(Params = #{id := RuleId}) -> update_rule(Params = #{id := RuleId}) ->
case emqx_rule_registry:get_rule(RuleId) of case emqx_rule_registry:get_rule(RuleId) of
{ok, Rule0} -> {ok, Rule0} ->
@ -205,7 +207,7 @@ delete_rule(RuleId) ->
ok ok
end. end.
-spec(create_resource(#{}) -> {ok, resource()} | {error, Reason :: term()}). -spec(create_resource(#{type := _, config := _, _ => _}) -> {ok, resource()} | {error, Reason :: term()}).
create_resource(#{type := Type, config := Config} = Params) -> create_resource(#{type := Type, config := Config} = Params) ->
case emqx_rule_registry:find_resource_type(Type) of case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {M, F}, params_spec = ParamSpec}} -> {ok, #resource_type{on_create = {M, F}, params_spec = ParamSpec}} ->
@ -214,7 +216,9 @@ create_resource(#{type := Type, config := Config} = Params) ->
Resource = #resource{id = ResId, Resource = #resource{id = ResId,
type = Type, type = Type,
config = Config, config = Config,
description = iolist_to_binary(maps:get(description, Params, ""))}, description = iolist_to_binary(maps:get(description, Params, "")),
created_at = erlang:system_time(millisecond)
},
ok = emqx_rule_registry:add_resource(Resource), ok = emqx_rule_registry:add_resource(Resource),
%% Note that we will return OK in case of resource creation failure, %% Note that we will return OK in case of resource creation failure,
%% users can always re-start the resource later. %% users can always re-start the resource later.
@ -230,14 +234,14 @@ start_resource(ResId) ->
{ok, #resource{type = ResType, config = Config}} -> {ok, #resource{type = ResType, config = Config}} ->
{ok, #resource_type{on_create = {Mod, Create}}} {ok, #resource_type{on_create = {Mod, Create}}}
= emqx_rule_registry:find_resource_type(ResType), = emqx_rule_registry:find_resource_type(ResType),
init_resource(Mod, Create, ResId, Config), _ = init_resource(Mod, Create, ResId, Config),
refresh_actions_of_a_resource(ResId), refresh_actions_of_a_resource(ResId),
ok; ok;
not_found -> not_found ->
{error, {resource_not_found, ResId}} {error, {resource_not_found, ResId}}
end. end.
-spec(test_resource(#{}) -> ok | {error, Reason :: term()}). -spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
test_resource(#{type := Type, config := Config}) -> test_resource(#{type := Type, config := Config}) ->
case emqx_rule_registry:find_resource_type(Type) of case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {ModC,Create}, on_destroy = {ModD,Destroy}, params_spec = ParamSpec}} -> {ok, #resource_type{on_create = {ModC,Create}, on_destroy = {ModD,Destroy}, params_spec = ParamSpec}} ->
@ -283,6 +287,12 @@ delete_resource(ResId) ->
{error, {resource_not_found, ResId}} {error, {resource_not_found, ResId}}
end. end.
%% @doc Ensure resource deleted. `resource_not_found` error is discarded.
-spec(ensure_resource_deleted(resource_id()) -> ok).
ensure_resource_deleted(ResId) ->
_ = delete_resource(ResId),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Re-establish resources %% Re-establish resources
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -364,8 +374,7 @@ with_resource_params(Args = #{<<"$resource">> := ResId}) ->
end; end;
with_resource_params(Args) -> Args. with_resource_params(Args) -> Args.
may_update_rule_params(Rule, Params) when map_size(Params) =:= 0 -> -dialyzer([{nowarn_function, may_update_rule_params/2}]).
Rule;
may_update_rule_params(Rule, Params = #{rawsql := SQL}) -> may_update_rule_params(Rule, Params = #{rawsql := SQL}) ->
case emqx_rule_sqlparser:parse_select(SQL) of case emqx_rule_sqlparser:parse_select(SQL) of
{ok, Select} -> {ok, Select} ->
@ -530,12 +539,12 @@ fetch_resource_status(Module, OnStatus, ResId) ->
end. end.
refresh_actions_of_a_resource(ResId) -> refresh_actions_of_a_resource(ResId) ->
[refresh_actions(Actions, R = fun (#action_instance{args = #{<<"$resource">> := ResId0}})
fun (#action_instance{args = #{<<"$resource">> := ResId0}})
when ResId0 =:= ResId -> true; when ResId0 =:= ResId -> true;
(_) -> false (_) -> false
end) end,
|| #rule{actions = Actions} <- emqx_rule_registry:get_rules()]. F = fun(#rule{actions = Actions}) -> refresh_actions(Actions, R) end,
lists:foreach(F, emqx_rule_registry:get_rules()).
refresh_actions(Actions) -> refresh_actions(Actions) ->
refresh_actions(Actions, fun(_) -> true end). refresh_actions(Actions, fun(_) -> true end).

View File

@ -22,6 +22,9 @@
-import(minirest, [return/1]). -import(minirest, [return/1]).
%% A lot of case clause no_match:es from rule_events.hrl
-dialyzer(no_match).
-rest_api(#{name => create_rule, -rest_api(#{name => create_rule,
method => 'POST', method => 'POST',
path => "/rules/", path => "/rules/",
@ -352,7 +355,7 @@ start_resource(#{id := Id}, _Params) ->
delete_resource(#{id := Id}, _Params) -> delete_resource(#{id := Id}, _Params) ->
try try
emqx_rule_engine:delete_resource(Id), ok = emqx_rule_engine:ensure_resource_deleted(Id),
return(ok) return(ok)
catch catch
_Error:{throw,Reason} -> _Error:{throw,Reason} ->

View File

@ -31,8 +31,8 @@ start_link() ->
init([]) -> init([]) ->
Opts = [public, named_table, set, {read_concurrency, true}], Opts = [public, named_table, set, {read_concurrency, true}],
ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]), _ = ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]),
ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
Registry = #{id => emqx_rule_registry, Registry = #{id => emqx_rule_registry,
start => {emqx_rule_registry, start_link, []}, start => {emqx_rule_registry, start_link, []},
restart => permanent, restart => permanent,

View File

@ -56,9 +56,10 @@
-endif. -endif.
load(Env) -> load(Env) ->
[emqx_hooks:add(HookPoint, {?MODULE, hook_fun(HookPoint), [hook_conf(HookPoint, Env)]}) lists:foreach(
|| HookPoint <- ?SUPPORTED_HOOK], fun(HookPoint) ->
ok. ok = emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [hook_conf(HookPoint, Env)]})
end, ?SUPPORTED_HOOK).
unload(_Env) -> unload(_Env) ->
[emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}) [emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
@ -291,7 +292,8 @@ may_publish_and_apply(EventName, GenEventMsg, #{enabled := true, qos := QoS}) ->
EventMsg = GenEventMsg(), EventMsg = GenEventMsg(),
case emqx_json:safe_encode(EventMsg) of case emqx_json:safe_encode(EventMsg) of
{ok, Payload} -> {ok, Payload} ->
emqx_broker:safe_publish(make_msg(QoS, EventTopic, Payload)); _ = emqx_broker:safe_publish(make_msg(QoS, EventTopic, Payload)),
ok;
{error, _Reason} -> {error, _Reason} ->
?LOG(error, "Failed to encode event msg for ~p, msg: ~p", [EventName, EventMsg]) ?LOG(error, "Failed to encode event msg for ~p, msg: ~p", [EventName, EventMsg])
end, end,

View File

@ -515,25 +515,28 @@ sprintf_s(Format, Args) when is_list(Args) ->
erlang:iolist_to_binary(io_lib:format(binary_to_list(Format), Args)). erlang:iolist_to_binary(io_lib:format(binary_to_list(Format), Args)).
pad(S, Len) when is_binary(S), is_integer(Len) -> pad(S, Len) when is_binary(S), is_integer(Len) ->
iolist_to_binary(string:pad(S, Len, trailing)). iolist_to_binary(string:pad(S, Len, trailing)).
pad(S, Len, <<"trailing">>) when is_binary(S), is_integer(Len) -> pad(S, Len, <<"trailing">>) when is_binary(S), is_integer(Len) ->
iolist_to_binary(string:pad(S, Len, trailing)); iolist_to_binary(string:pad(S, Len, trailing));
pad(S, Len, <<"both">>) when is_binary(S), is_integer(Len) -> pad(S, Len, <<"both">>) when is_binary(S), is_integer(Len) ->
iolist_to_binary(string:pad(S, Len, both)); iolist_to_binary(string:pad(S, Len, both));
pad(S, Len, <<"leading">>) when is_binary(S), is_integer(Len) -> pad(S, Len, <<"leading">>) when is_binary(S), is_integer(Len) ->
iolist_to_binary(string:pad(S, Len, leading)). iolist_to_binary(string:pad(S, Len, leading)).
pad(S, Len, <<"trailing">>, Char) when is_binary(S), is_integer(Len), is_binary(Char) -> pad(S, Len, <<"trailing">>, Char) when is_binary(S), is_integer(Len), is_binary(Char) ->
iolist_to_binary(string:pad(S, Len, trailing, Char)); Chars = unicode:characters_to_list(Char, utf8),
iolist_to_binary(string:pad(S, Len, trailing, Chars));
pad(S, Len, <<"both">>, Char) when is_binary(S), is_integer(Len), is_binary(Char) -> pad(S, Len, <<"both">>, Char) when is_binary(S), is_integer(Len), is_binary(Char) ->
iolist_to_binary(string:pad(S, Len, both, Char)); Chars = unicode:characters_to_list(Char, utf8),
iolist_to_binary(string:pad(S, Len, both, Chars));
pad(S, Len, <<"leading">>, Char) when is_binary(S), is_integer(Len), is_binary(Char) -> pad(S, Len, <<"leading">>, Char) when is_binary(S), is_integer(Len), is_binary(Char) ->
iolist_to_binary(string:pad(S, Len, leading, Char)). Chars = unicode:characters_to_list(Char, utf8),
iolist_to_binary(string:pad(S, Len, leading, Chars)).
replace(SrcStr, P, RepStr) when is_binary(SrcStr), is_binary(P), is_binary(RepStr) -> replace(SrcStr, P, RepStr) when is_binary(SrcStr), is_binary(P), is_binary(RepStr) ->
iolist_to_binary(string:replace(SrcStr, P, RepStr, all)). iolist_to_binary(string:replace(SrcStr, P, RepStr, all)).

View File

@ -99,18 +99,18 @@
-record(state, { -record(state, {
metric_ids = sets:new(), metric_ids = sets:new(),
rule_speeds :: #{rule_id() => #rule_speed{}}, rule_speeds :: undefined | #{rule_id() => #rule_speed{}},
overall_rule_speed :: #rule_speed{} overall_rule_speed :: #rule_speed{}
}). }).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec(create_rule_metrics(rule_id()) -> Ref :: reference()). -spec(create_rule_metrics(rule_id()) -> Ref :: counters:counters_ref()).
create_rule_metrics(Id) -> create_rule_metrics(Id) ->
gen_server:call(?MODULE, {create_rule_metrics, Id}). gen_server:call(?MODULE, {create_rule_metrics, Id}).
-spec(create_metrics(rule_id()) -> Ref :: reference()). -spec(create_metrics(rule_id()) -> Ref :: counters:counters_ref()).
create_metrics(Id) -> create_metrics(Id) ->
gen_server:call(?MODULE, {create_metrics, Id}). gen_server:call(?MODULE, {create_metrics, Id}).
@ -133,7 +133,7 @@ get(Id, Metric) ->
get_overall(Metric) -> get_overall(Metric) ->
emqx_metrics:val(Metric). emqx_metrics:val(Metric).
-spec(get_rule_speed(atom()) -> map()). -spec(get_rule_speed(rule_id()) -> map()).
get_rule_speed(Id) -> get_rule_speed(Id) ->
gen_server:call(?MODULE, {get_rule_speed, Id}). gen_server:call(?MODULE, {get_rule_speed, Id}).
@ -157,14 +157,16 @@ get_action_metrics(Id) ->
taken => get_actions_taken(Id) taken => get_actions_taken(Id)
}. }.
-spec(inc(rule_id(), atom()) -> ok). -spec inc(rule_id(), atom()) -> ok.
inc(Id, Metric) -> inc(Id, Metric) ->
inc(Id, Metric, 1). inc(Id, Metric, 1).
-spec inc(rule_id(), atom(), pos_integer()) -> ok.
inc(Id, Metric, Val) -> inc(Id, Metric, Val) ->
counters:add(couters_ref(Id), metrics_idx(Metric), Val), counters:add(couters_ref(Id), metrics_idx(Metric), Val),
inc_overall(Metric, Val). inc_overall(Metric, Val).
-spec(inc_overall(rule_id(), atom()) -> ok). -spec(inc_overall(atom(), pos_integer()) -> ok).
inc_overall(Metric, Val) -> inc_overall(Metric, Val) ->
emqx_metrics:inc(Metric, Val). emqx_metrics:inc(Metric, Val).

View File

@ -166,6 +166,10 @@ start_link() ->
get_rules() -> get_rules() ->
get_all_records(?RULE_TAB). get_all_records(?RULE_TAB).
%% TODO: emqx_rule_utils:can_topic_match_oneof(Topic::any(), For::atom())
%% will never return since it differs in the 2nd argument from the success
%% typing arguments: (any(), [binary() | ['' | '#' | '+' | binary()]])
-dialyzer([{nowarn_function, get_rules_for/1}]).
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). -spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
get_rules_for(Topic) -> get_rules_for(Topic) ->
[Rule || Rule = #rule{for = For} <- get_rules(), [Rule || Rule = #rule{for = For} <- get_rules(),
@ -329,9 +333,10 @@ remove_resource_params(ResId) ->
%% @private %% @private
delete_resource(ResId) -> delete_resource(ResId) ->
[[ResId =:= ResId1 andalso throw({dependency_exists, {rule, Id}}) %% TODO, change to foreache:s
|| #action_instance{args = #{<<"$resource">> := ResId1}} <- Actions] _ = [[ResId =:= ResId1 andalso throw({dependency_exists, {rule, Id}})
|| #rule{id = Id, actions = Actions} <- get_rules()], || #action_instance{args = #{<<"$resource">> := ResId1}} <- Actions]
|| #rule{id = Id, actions = Actions} <- get_rules()],
mnesia:delete(?RES_TAB, ResId, write). mnesia:delete(?RES_TAB, ResId, write).
%% @private %% @private

View File

@ -49,7 +49,7 @@ apply_rules([], _Input) ->
apply_rules([#rule{enabled = false}|More], Input) -> apply_rules([#rule{enabled = false}|More], Input) ->
apply_rules(More, Input); apply_rules(More, Input);
apply_rules([Rule = #rule{id = RuleID}|More], Input) -> apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
try apply_rule(Rule, Input) try apply_rule_discard_result(Rule, Input)
catch catch
%% ignore the errors if select or match failed %% ignore the errors if select or match failed
_:{select_and_transform_error, Error} -> _:{select_and_transform_error, Error} ->
@ -70,6 +70,13 @@ apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
end, end,
apply_rules(More, Input). apply_rules(More, Input).
apply_rule_discard_result(Rule, Input) ->
%% TODO check if below two clauses are ok to discard:
%% {'error','nomatch'}
%% {'ok',[any()]}
_ = apply_rule(Rule, Input),
ok.
apply_rule(Rule = #rule{id = RuleID}, Input) -> apply_rule(Rule = #rule{id = RuleID}, Input) ->
clear_rule_payload(), clear_rule_payload(),
do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
@ -160,6 +167,7 @@ select_and_collect([Field|More], Input, {Output, LastKV}) ->
{nested_put(Key, Val, Output), LastKV}). {nested_put(Key, Val, Output), LastKV}).
%% Filter each item got from FOREACH %% Filter each item got from FOREACH
-dialyzer({nowarn_function, filter_collection/4}).
filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) -> filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
lists:filtermap( lists:filtermap(
fun(Item) -> fun(Item) ->
@ -242,7 +250,7 @@ take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = A
error:{badfun, _Func}:_ST -> error:{badfun, _Func}:_ST ->
%?LOG(warning, "Action ~p maybe outdated, refresh it and try again." %?LOG(warning, "Action ~p maybe outdated, refresh it and try again."
% "Func: ~p~nST:~0p", [Id, Func, ST]), % "Func: ~p~nST:~0p", [Id, Func, ST]),
trans_action_on(Id, fun() -> _ = trans_action_on(Id, fun() ->
emqx_rule_engine:refresh_actions([ActInst]) emqx_rule_engine:refresh_actions([ActInst])
end, 5000), end, 5000),
emqx_rule_metrics:inc_actions_retry(Id), emqx_rule_metrics:inc_actions_retry(Id),
@ -291,11 +299,11 @@ wait_action_on(Id, RetryN) ->
handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) -> handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) ->
?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]), ?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]),
take_actions(Fallbacks, Selected, Envs, continue), _ = take_actions(Fallbacks, Selected, Envs, continue),
failed; failed;
handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) -> handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) ->
?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]), ?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]),
take_actions(Fallbacks, Selected, Envs, continue), _ = take_actions(Fallbacks, Selected, Envs, continue),
error({take_action_failed, {Id, Reason}}). error({take_action_failed, {Id, Reason}}).
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->

View File

@ -49,6 +49,10 @@
-export_type([select/0]). -export_type([select/0]).
%% Dialyzer gives up on the generated code.
%% probably due to stack depth, or inlines.
-dialyzer({nowarn_function, [parse_select/1]}).
%% Parse one select statement. %% Parse one select statement.
-spec(parse_select(string() | binary()) -spec(parse_select(string() | binary())
-> {ok, select()} | {parse_error, term()} | {lex_error, term()}). -> {ok, select()} | {parse_error, term()} | {lex_error, term()}).
@ -76,7 +80,7 @@ parse_select(Sql) ->
end end
catch catch
_Error:Reason:StackTrace -> _Error:Reason:StackTrace ->
{parse_error, Reason, StackTrace} {parse_error, {Reason, StackTrace}}
end. end.
-spec(select_fields(select()) -> list(field())). -spec(select_fields(select()) -> list(field())).

View File

@ -21,24 +21,30 @@
-export([ test/1 -export([ test/1
]). ]).
%% Dialyzer gives up on the generated code.
%% probably due to stack depth, or inlines.
-dialyzer({nowarn_function, [test/1,
test_rule/4,
flatten/1,
sql_test_action/0,
fill_default_values/2
]}).
-spec(test(#{}) -> {ok, Result::map()} | no_return()). -spec(test(#{}) -> {ok, Result::map()} | no_return()).
test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) -> test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
case emqx_rule_sqlparser:parse_select(Sql) of {ok, Select} = emqx_rule_sqlparser:parse_select(Sql),
{ok, Select} -> InTopic = maps:get(<<"topic">>, Context, <<>>),
InTopic = maps:get(<<"topic">>, Context, <<>>), EventTopics = emqx_rule_sqlparser:select_from(Select),
EventTopics = emqx_rule_sqlparser:select_from(Select), case lists:all(fun is_publish_topic/1, EventTopics) of
case lists:all(fun is_publish_topic/1, EventTopics) of true ->
true -> %% test if the topic matches the topic filters in the rule
%% test if the topic matches the topic filters in the rule case emqx_rule_utils:can_topic_match_oneof(InTopic, EventTopics) of
case emqx_rule_utils:can_topic_match_oneof(InTopic, EventTopics) of true -> test_rule(Sql, Select, Context, EventTopics);
true -> test_rule(Sql, Select, Context, EventTopics); false -> {error, nomatch}
false -> {error, nomatch}
end;
false ->
%% the rule is for both publish and events, test it directly
test_rule(Sql, Select, Context, EventTopics)
end; end;
Error -> error(Error) false ->
%% the rule is for both publish and events, test it directly
test_rule(Sql, Select, Context, EventTopics)
end. end.
test_rule(Sql, Select, Context, EventTopics) -> test_rule(Sql, Select, Context, EventTopics) ->

View File

@ -31,25 +31,31 @@
%% APIs %% APIs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Validate the params according the spec and return a new spec. %% Validate the params according the spec.
%% Note that this function will throw out exceptions in case of %% Note that this function throws exception in case of validation failure.
%% validation failure. -spec(validate_params(params(), params_spec()) -> ok).
-spec(validate_params(params(), params_spec()) -> params()).
validate_params(Params, ParamsSepc) -> validate_params(Params, ParamsSepc) ->
maps:map(fun(Name, Spec) -> F = fun(Name, Spec) ->
do_validate_param(Name, Spec, Params) do_validate_param(Name, Spec, Params)
end, ParamsSepc), end,
ok. map_foreach(F, ParamsSepc).
-spec(validate_spec(params_spec()) -> ok). -spec(validate_spec(params_spec()) -> ok).
validate_spec(ParamsSepc) -> validate_spec(ParamsSepc) -> map_foreach(fun do_validate_spec/2, ParamsSepc).
maps:map(fun do_validate_spec/2, ParamsSepc),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal Functions %% Internal Functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
map_foreach(Fun, Map) ->
Iterator = maps:iterator(Map),
map_foreach_loop(Fun, maps:next(Iterator)).
map_foreach_loop(_Fun, none) -> ok;
map_foreach_loop(Fun, {Key, Value, Iterator}) ->
_ = Fun(Key, Value),
map_foreach_loop(Fun, maps:next(Iterator)).
do_validate_param(Name, Spec = #{required := true}, Params) -> do_validate_param(Name, Spec = #{required := true}, Params) ->
find_field(Name, Params, find_field(Name, Params,
fun (not_found) -> error({required_field_missing, Name}); fun (not_found) -> error({required_field_missing, Name});
@ -171,5 +177,5 @@ do_find_field([F | Fields], Spec, Func) ->
end. end.
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
bin(Str) when is_list(Str) -> atom_to_list(Str); bin(Str) when is_list(Str) -> iolist_to_binary(Str);
bin(Bin) when is_binary(Bin) -> Bin. bin(Bin) when is_binary(Bin) -> Bin.

View File

@ -30,7 +30,7 @@
start(_Type, _Args) -> start(_Type, _Args) ->
ok = emqx_sasl:init(), ok = emqx_sasl:init(),
emqx_sasl:load(), _ = emqx_sasl:load(),
emqx_sasl_cli:load(), emqx_sasl_cli:load(),
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@ -43,4 +43,4 @@ stop(_State) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
{ok, { {one_for_all, 1, 10}, []} }. {ok, { {one_for_all, 1, 10}, []} }.

View File

@ -217,12 +217,8 @@ nonce() ->
base64:encode([$a + rand:uniform(26) || _ <- lists:seq(1, 10)]). base64:encode([$a + rand:uniform(26) || _ <- lists:seq(1, 10)]).
pbkdf2_sha_1(Password, Salt, IterationCount) -> pbkdf2_sha_1(Password, Salt, IterationCount) ->
case pbkdf2:pbkdf2(sha, Password, Salt, IterationCount) of {ok, Bin} = pbkdf2:pbkdf2(sha, Password, Salt, IterationCount),
{ok, Bin} -> pbkdf2:to_hex(Bin).
pbkdf2:to_hex(Bin);
{error, Reason} ->
error(Reason)
end.
-if(?OTP_RELEASE >= 23). -if(?OTP_RELEASE >= 23).
hmac(Key, Data) -> hmac(Key, Data) ->

View File

@ -74,7 +74,7 @@
sockstate :: emqx_types:sockstate(), sockstate :: emqx_types:sockstate(),
sockname :: {inet:ip_address(), inet:port()}, sockname :: {inet:ip_address(), inet:port()},
peername :: {inet:ip_address(), inet:port()}, peername :: {inet:ip_address(), inet:port()},
channel :: emqx_channel:channel(), channel :: maybe(emqx_channel:channel()),
registry :: emqx_sn_registry:registry(), registry :: emqx_sn_registry:registry(),
clientid :: maybe(binary()), clientid :: maybe(binary()),
username :: maybe(binary()), username :: maybe(binary()),
@ -105,6 +105,8 @@
-define(NO_PEERCERT, undefined). -define(NO_PEERCERT, undefined).
%% TODO: fix when https://github.com/emqx/emqx-sn/pull/170 is merged
-dialyzer([{nowarn_function, [idle/3]}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Exported APIs %% Exported APIs
@ -136,7 +138,8 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
EnableStats = proplists:get_value(enable_stats, Options, false), EnableStats = proplists:get_value(enable_stats, Options, false),
case inet:sockname(Sock) of case inet:sockname(Sock) of
{ok, Sockname} -> {ok, Sockname} ->
Channel = emqx_channel:init(#{sockname => Sockname, Channel = emqx_channel:init(#{socktype => udp,
sockname => Sockname,
peername => Peername, peername => Peername,
protocol => 'mqtt-sn', protocol => 'mqtt-sn',
peercert => ?NO_PEERCERT, peercert => ?NO_PEERCERT,
@ -195,14 +198,18 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State =
idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
topic_id_type = TopicIdType topic_id_type = TopicIdType
}, TopicId, _MsgId, Data)}, }, TopicId, _MsgId, Data)},
State = #state{clientid = ClientId, registry = Registry}) -> State = #state{clientid = ClientId, registry = Registry}) ->
TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of
false -> false -> emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId);
emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId);
true -> <<TopicId:16>> true -> <<TopicId:16>>
end, end,
Msg = emqx_message:make(?NEG_QOS_CLIENT_ID, ?QOS_0, TopicName, Data), case TopicName =/= undefined of
(TopicName =/= undefined) andalso emqx_broker:publish(Msg), true ->
Msg = emqx_message:make(?NEG_QOS_CLIENT_ID, ?QOS_0, TopicName, Data),
emqx_broker:publish(Msg);
false ->
ok
end,
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId], State), ?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId], State),
{keep_state_and_data, State#state.idle_timeout}; {keep_state_and_data, State#state.idle_timeout};
@ -473,10 +480,10 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
State = #state{sockpid = SockPid, channel = _Channel}) -> State = #state{sockpid = SockPid, channel = _Channel}) ->
?LOG(debug, "RECV ~p", [Data], State), ?LOG(debug, "RECV ~p", [Data], State),
Oct = iolist_size(Data), Oct = iolist_size(Data),
emqx_pd:inc_counter(recv_oct, Oct), inc_counter(recv_oct, Oct),
try emqx_sn_frame:parse(Data) of try emqx_sn_frame:parse(Data) of
{ok, Msg} -> {ok, Msg} ->
emqx_pd:inc_counter(recv_cnt, 1), inc_counter(recv_cnt, 1),
?LOG(info, "RECV ~s at state ~s", ?LOG(info, "RECV ~s at state ~s",
[emqx_sn_frame:format(Msg), StateName], State), [emqx_sn_frame:format(Msg), StateName], State),
{keep_state, State, next_event({incoming, Msg})} {keep_state, State, next_event({incoming, Msg})}
@ -562,10 +569,9 @@ terminate(Reason, _StateName, #state{clientid = ClientId,
channel = Channel, channel = Channel,
registry = Registry}) -> registry = Registry}) ->
emqx_sn_registry:unregister_topic(Registry, ClientId), emqx_sn_registry:unregister_topic(Registry, ClientId),
case {Channel, Reason} of case Channel =:= undefined of
{undefined, _} -> ok; true -> ok;
{_, _} -> false -> emqx_channel:terminate(Reason, Channel)
emqx_channel:terminate(Reason, Channel)
end. end.
code_change(_Vsn, StateName, State, _Extra) -> code_change(_Vsn, StateName, State, _Extra) ->
@ -597,8 +603,8 @@ handle_info(Info, State = #state{channel = Channel}) ->
handle_return(emqx_channel:handle_info(Info, Channel), State). handle_return(emqx_channel:handle_info(Info, Channel), State).
handle_ping(_PingReq, State) -> handle_ping(_PingReq, State) ->
emqx_pd:inc_counter(recv_oct, 2), inc_counter(recv_oct, 2),
emqx_pd:inc_counter(recv_msg, 1), inc_counter(recv_msg, 1),
ok = send_message(?SN_PINGRESP_MSG(), State), ok = send_message(?SN_PINGRESP_MSG(), State),
{keep_state, State}. {keep_state, State}.
@ -947,7 +953,7 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) ->
qos = QoS, retain = Retain}, qos = QoS, retain = Retain},
variable = #mqtt_packet_publish{topic_name = Topic, packet_id = 1000}, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = 1000},
payload = Payload}, payload = Payload},
emqx_broker:publish(emqx_packet:to_message(Publish, ClientId)), _ = emqx_broker:publish(emqx_packet:to_message(Publish, ClientId)),
ok. ok.
do_puback(TopicId, MsgId, ReturnCode, _StateName, do_puback(TopicId, MsgId, ReturnCode, _StateName,
@ -1075,18 +1081,24 @@ transform_fun() ->
fun(Packet, State) -> transform(Packet, FunMsgIdToTopicId, State) end. fun(Packet, State) -> transform(Packet, FunMsgIdToTopicId, State) end.
inc_incoming_stats(Type) -> inc_incoming_stats(Type) ->
emqx_pd:inc_counter(recv_pkt, 1), inc_counter(recv_pkt, 1),
case Type == ?PUBLISH of case Type == ?PUBLISH of
true -> true ->
emqx_pd:inc_counter(recv_msg, 1), inc_counter(recv_msg, 1),
emqx_pd:inc_counter(incoming_pubs, 1); inc_counter(incoming_pubs, 1);
false -> ok false -> ok
end. end.
inc_outgoing_stats(Type) -> inc_outgoing_stats(Type) ->
emqx_pd:inc_counter(send_pkt, 1), inc_counter(send_pkt, 1),
(Type == ?SN_PUBLISH) case Type =:= ?SN_PUBLISH of
andalso emqx_pd:inc_counter(send_msg, 1). true -> inc_counter(send_msg, 1);
false -> ok
end.
next_event(Content) -> next_event(Content) ->
{next_event, cast, Content}. {next_event, cast, Content}.
inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc),
ok.

View File

@ -24,7 +24,7 @@
%% STOMP Frame %% STOMP Frame
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-record(stomp_frame, {command, headers = [], body = <<>> :: iolist()}). -record(stomp_frame, {command, headers = [], body = <<>> :: iodata()}).
-type(stomp_frame() :: #stomp_frame{}). -type(stomp_frame() :: #stomp_frame{}).

View File

@ -87,7 +87,11 @@
-define(IS_ESC(Ch), Ch == ?CR; Ch == ?LF; Ch == ?BSL; Ch == ?COLON). -define(IS_ESC(Ch), Ch == ?CR; Ch == ?LF; Ch == ?BSL; Ch == ?COLON).
-record(parser_state, {cmd, headers = [], hdname, acc = <<>>, limit}). -record(parser_state, {cmd,
headers = [],
hdname,
acc = <<>> :: binary(),
limit}).
-record(frame_limit, {max_header_num, max_header_length, max_body_length}). -record(frame_limit, {max_header_num, max_header_length, max_body_length}).

View File

@ -99,12 +99,12 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
send(connected_frame([{<<"version">>, Version}, send(connected_frame([{<<"version">>, Version},
{<<"heart-beat">>, reverse_heartbeats(Heartbeats)}]), NewState); {<<"heart-beat">>, reverse_heartbeats(Heartbeats)}]), NewState);
false -> false ->
send(error_frame(undefined, <<"Login or passcode error!">>), State), _ = send(error_frame(undefined, <<"Login or passcode error!">>), State),
{error, login_or_passcode_error, State} {error, login_or_passcode_error, State}
end; end;
{error, Msg} -> {error, Msg} ->
send(error_frame([{<<"version">>, <<"1.0,1.1,1.2">>}, _ = send(error_frame([{<<"version">>, <<"1.0,1.1,1.2">>},
{<<"content-type">>, <<"text/plain">>}], undefined, Msg), State), {<<"content-type">>, <<"text/plain">>}], undefined, Msg), State),
{error, unsupported_version, State} {error, unsupported_version, State}
end; end;
@ -114,10 +114,9 @@ received(#stomp_frame{command = <<"CONNECT">>}, State = #stomp_proto{connected =
received(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) -> received(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) ->
Topic = header(<<"destination">>, Headers), Topic = header(<<"destination">>, Headers),
Action = fun(State0) -> Action = fun(State0) ->
maybe_send_receipt(receipt_id(Headers), State0), _ = maybe_send_receipt(receipt_id(Headers), State0),
emqx_broker:publish( _ = emqx_broker:publish(
make_mqtt_message(Topic, Headers, iolist_to_binary(Body)) make_mqtt_message(Topic, Headers, iolist_to_binary(Body))),
),
State0 State0
end, end,
case header(<<"transaction">>, Headers) of case header(<<"transaction">>, Headers) of
@ -160,7 +159,7 @@ received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
received(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) -> received(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
Id = header(<<"id">>, Headers), Id = header(<<"id">>, Headers),
Action = fun(State0) -> Action = fun(State0) ->
maybe_send_receipt(receipt_id(Headers), State0), _ = maybe_send_receipt(receipt_id(Headers), State0),
ack(Id, State0) ack(Id, State0)
end, end,
case header(<<"transaction">>, Headers) of case header(<<"transaction">>, Headers) of
@ -176,7 +175,7 @@ received(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
received(#stomp_frame{command = <<"NACK">>, headers = Headers}, State) -> received(#stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
Id = header(<<"id">>, Headers), Id = header(<<"id">>, Headers),
Action = fun(State0) -> Action = fun(State0) ->
maybe_send_receipt(receipt_id(Headers), State0), _ = maybe_send_receipt(receipt_id(Headers), State0),
nack(Id, State0) nack(Id, State0)
end, end,
case header(<<"transaction">>, Headers) of case header(<<"transaction">>, Headers) of
@ -226,7 +225,7 @@ received(#stomp_frame{command = <<"ABORT">>, headers = Headers}, State) ->
end; end;
received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) -> received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) ->
maybe_send_receipt(receipt_id(Headers), State), _ = maybe_send_receipt(receipt_id(Headers), State),
{stop, normal, State}. {stop, normal, State}.
send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},

View File

@ -132,6 +132,12 @@ get_telemetry() ->
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% This is to suppress dialyzer warnings for mnesia:dirty_write and
%% dirty_read race condition. Given that the init function is not evaluated
%% concurrently in one node, it should be free of race condition.
%% Given the chance of having two nodes bootstraping with the write
%% is very small, it should be safe to ignore.
-dialyzer([{nowarn_function, [init/1]}]).
init([Opts]) -> init([Opts]) ->
State = #state{url = get_value(url, Opts), State = #state{url = get_value(url, Opts),
report_interval = timer:seconds(get_value(report_interval, Opts))}, report_interval = timer:seconds(get_value(report_interval, Opts))},
@ -409,4 +415,4 @@ module_attributes(Module) ->
bin(L) when is_list(L) -> bin(L) when is_list(L) ->
list_to_binary(L); list_to_binary(L);
bin(B) when is_binary(B) -> bin(B) when is_binary(B) ->
B. B.

View File

@ -103,7 +103,7 @@ get_data(_Bindings, _Params) ->
return(get_telemetry_data()). return(get_telemetry_data()).
enable_telemetry() -> enable_telemetry() ->
[enable_telemetry(Node) || Node <- ekka_mnesia:running_nodes()], ok. lists:foreach(fun enable_telemetry/1, ekka_mnesia:running_nodes()).
enable_telemetry(Node) when Node =:= node() -> enable_telemetry(Node) when Node =:= node() ->
emqx_telemetry:enable(); emqx_telemetry:enable();
@ -111,7 +111,7 @@ enable_telemetry(Node) ->
rpc_call(Node, ?MODULE, enable_telemetry, [Node]). rpc_call(Node, ?MODULE, enable_telemetry, [Node]).
disable_telemetry() -> disable_telemetry() ->
[disable_telemetry(Node) || Node <- ekka_mnesia:running_nodes()], ok. lists:foreach(fun disable_telemetry/1, ekka_mnesia:running_nodes()).
disable_telemetry(Node) when Node =:= node() -> disable_telemetry(Node) when Node =:= node() ->
emqx_telemetry:disable(); emqx_telemetry:disable();
@ -128,4 +128,4 @@ rpc_call(Node, Module, Fun, Args) ->
case rpc:call(Node, Module, Fun, Args) of case rpc:call(Node, Module, Fun, Args) of
{badrpc, Reason} -> {error, Reason}; {badrpc, Reason} -> {error, Reason};
Result -> Result Result -> Result
end. end.

View File

@ -9,9 +9,9 @@
{minimum_otp_vsn, "21.3"}. {minimum_otp_vsn, "21.3"}.
{edoc_opts, [{preprocess,true}]}. {edoc_opts, [{preprocess,true}]}.
{erl_opts, [warn_unused_vars,warn_shadow_vars,warn_unused_import, {erl_opts, [warn_unused_vars,warn_shadow_vars,warn_unused_import,
warn_obsolete_guard,no_debug_info,compressed]}. warn_obsolete_guard,compressed]}.
{overrides,[{add,[{erl_opts,[no_debug_info,compressed,deterministic, {overrides,[{add,[{erl_opts,[compressed,deterministic,
{parse_transform,mod_vsn}]}]} {parse_transform,mod_vsn}]}]}
,{add,[{extra_src_dirs, [{"etc", [{recursive,true}]}]}]} ,{add,[{extra_src_dirs, [{"etc", [{recursive,true}]}]}]}
]}. ]}.
@ -46,7 +46,7 @@
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.2"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.2"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.0"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.0"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.2.0"}}} , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.2.0"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.3"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}}
, {getopt, "1.0.1"} , {getopt, "1.0.1"}

View File

@ -34,12 +34,18 @@ test_deps() ->
profiles() -> profiles() ->
[ {'emqx', [ {relx, relx('emqx')} [ {'emqx', [ {relx, relx('emqx')}
, {erl_opts, [no_debug_info]}
]} ]}
, {'emqx-pkg', [ {relx, relx('emqx-pkg')} , {'emqx-pkg', [ {relx, relx('emqx-pkg')}
, {erl_opts, [no_debug_info]}
]} ]}
, {'emqx-edge', [ {relx, relx('emqx-edge')} , {'emqx-edge', [ {relx, relx('emqx-edge')}
, {erl_opts, [no_debug_info]}
]} ]}
, {'emqx-edge-pkg', [ {relx, relx('emqx-edge-pkg')} , {'emqx-edge-pkg', [ {relx, relx('emqx-edge-pkg')}
, {erl_opts, [no_debug_info]}
]}
, {check, [ {erl_opts, [debug_info]}
]} ]}
, {test, [ {deps, test_deps()} , {test, [ {deps, test_deps()}
, {erl_opts, [debug_info]} , {erl_opts, [debug_info]}

View File

@ -86,7 +86,7 @@ restart(ConfFile) ->
reload_config(ConfFile), reload_config(ConfFile),
shutdown(), shutdown(),
ok = application:stop(mnesia), ok = application:stop(mnesia),
application:start(mnesia), _ = application:start(mnesia),
reboot(). reboot().
%% @doc Stop emqx application. %% @doc Stop emqx application.
@ -184,7 +184,7 @@ hook(HookPoint, Action, InitArgs) when is_list(InitArgs) ->
hook(HookPoint, Action, Filter, Priority) -> hook(HookPoint, Action, Filter, Priority) ->
emqx_hooks:add(HookPoint, Action, Filter, Priority). emqx_hooks:add(HookPoint, Action, Filter, Priority).
-spec(unhook(emqx_hooks:hookpoint(), function() | {module(), atom()}) -> ok). -spec(unhook(emqx_hooks:hookpoint(), fun() | {module(), atom()}) -> ok).
unhook(HookPoint, Action) -> unhook(HookPoint, Action) ->
emqx_hooks:del(HookPoint, Action). emqx_hooks:del(HookPoint, Action).
@ -205,8 +205,8 @@ shutdown() ->
shutdown(Reason) -> shutdown(Reason) ->
?LOG(critical, "emqx shutdown for ~s", [Reason]), ?LOG(critical, "emqx shutdown for ~s", [Reason]),
emqx_alarm_handler:unload(), _ = emqx_alarm_handler:unload(),
emqx_plugins:unload(), _ = emqx_plugins:unload(),
lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]). lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]).
reboot() -> reboot() ->

View File

@ -107,9 +107,7 @@ put_acl_cache(PubSub, Topic, AclResult) ->
%% delete all the acl entries %% delete all the acl entries
-spec(empty_acl_cache() -> ok). -spec(empty_acl_cache() -> ok).
empty_acl_cache() -> empty_acl_cache() ->
map_acl_cache(fun({CacheK, _CacheV}) -> foreach_acl_cache(fun({CacheK, _CacheV}) -> erlang:erase(CacheK) end),
erlang:erase(CacheK)
end),
set_cache_size(0), set_cache_size(0),
keys_queue_set(queue:new()). keys_queue_set(queue:new()).
@ -139,9 +137,13 @@ get_cache_size() ->
dump_acl_cache() -> dump_acl_cache() ->
map_acl_cache(fun(Cache) -> Cache end). map_acl_cache(fun(Cache) -> Cache end).
map_acl_cache(Fun) -> map_acl_cache(Fun) ->
[Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
orelse SubPub =:= subscribe]. orelse SubPub =:= subscribe].
foreach_acl_cache(Fun) ->
_ = map_acl_cache(Fun),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions

View File

@ -165,6 +165,10 @@ init([Opts]) ->
size_limit = SizeLimit, size_limit = SizeLimit,
validity_period = ValidityPeriod})}. validity_period = ValidityPeriod})}.
%% suppress dialyzer warning due to dirty read/write race condition.
%% TODO: change from dirty_read/write to transactional.
%% TODO: handle mnesia write errors.
-dialyzer([{nowarn_function, [handle_call/3]}]).
handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) -> handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) ->
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
[#activated_alarm{name = Name}] -> [#activated_alarm{name = Name}] ->
@ -211,7 +215,7 @@ handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions,
end; end;
handle_call(delete_all_deactivated_alarms, _From, State) -> handle_call(delete_all_deactivated_alarms, _From, State) ->
mnesia:clear_table(?DEACTIVATED_ALARM), clear_table(?DEACTIVATED_ALARM),
{reply, ok, State}; {reply, ok, State};
handle_call({get_alarms, all}, _From, State) -> handle_call({get_alarms, all}, _From, State) ->
@ -266,7 +270,17 @@ deactivate_all_alarms() ->
message = Message, message = Message,
deactivate_at = erlang:system_time(microsecond)}) deactivate_at = erlang:system_time(microsecond)})
end, ets:tab2list(?ACTIVATED_ALARM)), end, ets:tab2list(?ACTIVATED_ALARM)),
mnesia:clear_table(?ACTIVATED_ALARM). clear_table(?ACTIVATED_ALARM).
%% Delete all records from the given table, ignore result.
clear_table(TableName) ->
case mnesia:clear_table(TableName) of
{aborted, Reason} ->
?LOG(warning, "Faile to clear table ~p reason: ~p",
[TableName, Reason]);
{atomic, ok} ->
ok
end.
ensure_delete_timer(State = #state{validity_period = ValidityPeriod}) -> ensure_delete_timer(State = #state{validity_period = ValidityPeriod}) ->
State#state{timer = emqx_misc:start_timer(ValidityPeriod div 1, delete_expired_deactivated_alarm)}. State#state{timer = emqx_misc:start_timer(ValidityPeriod div 1, delete_expired_deactivated_alarm)}.
@ -299,7 +313,8 @@ do_actions(Operation, Alarm, [publish | More]) ->
{ok, Payload} = encode_to_json(Alarm), {ok, Payload} = encode_to_json(Alarm),
Message = emqx_message:make(?MODULE, 0, Topic, Payload, #{sys => true}, Message = emqx_message:make(?MODULE, 0, Topic, Payload, #{sys => true},
#{properties => #{'Content-Type' => <<"application/json">>}}), #{properties => #{'Content-Type' => <<"application/json">>}}),
emqx_broker:safe_publish(Message), %% TODO log failed publishes
_ = emqx_broker:safe_publish(Message),
do_actions(Operation, Alarm, More). do_actions(Operation, Alarm, More).
encode_to_json(Alarm) -> encode_to_json(Alarm) ->

View File

@ -85,4 +85,4 @@ handle_call(_Query, State) ->
terminate(swap, _State) -> terminate(swap, _State) ->
{emqx_alarm_handler, []}; {emqx_alarm_handler, []};
terminate(_, _) -> terminate(_, _) ->
ok. ok.

View File

@ -34,18 +34,18 @@ start(_Type, _Args) ->
{ok, Sup} = emqx_sup:start_link(), {ok, Sup} = emqx_sup:start_link(),
ok = emqx_modules:load(), ok = emqx_modules:load(),
ok = emqx_plugins:init(), ok = emqx_plugins:init(),
emqx_plugins:load(), _ = emqx_plugins:load(),
emqx_boot:is_enabled(listeners) emqx_boot:is_enabled(listeners)
andalso (ok = emqx_listeners:start()), andalso (ok = emqx_listeners:start()),
start_autocluster(), start_autocluster(),
register(emqx, self()), register(emqx, self()),
emqx_alarm_handler:load(), ok = emqx_alarm_handler:load(),
print_vsn(), print_vsn(),
{ok, Sup}. {ok, Sup}.
-spec(stop(State :: term()) -> term()). -spec(stop(State :: term()) -> term()).
stop(_State) -> stop(_State) ->
emqx_alarm_handler:unload(), ok = emqx_alarm_handler:unload(),
emqx_boot:is_enabled(listeners) emqx_boot:is_enabled(listeners)
andalso emqx_listeners:stop(), andalso emqx_listeners:stop(),
emqx_modules:unload(). emqx_modules:unload().

View File

@ -465,7 +465,8 @@ handle_cast({subscribe, Topic}, State) ->
handle_cast({unsubscribed, Topic}, State) -> handle_cast({unsubscribed, Topic}, State) ->
case ets:member(?SUBSCRIBER, Topic) of case ets:member(?SUBSCRIBER, Topic) of
false -> false ->
_ = emqx_router:do_delete_route(Topic); _ = emqx_router:do_delete_route(Topic),
ok;
true -> ok true -> ok
end, end,
{noreply, State}; {noreply, State};

View File

@ -1554,15 +1554,20 @@ mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
Channel; Channel;
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
case will_delay_interval(WillMsg) of case will_delay_interval(WillMsg) of
0 -> publish_will_msg(WillMsg), 0 ->
Channel#channel{will_msg = undefined}; ok = publish_will_msg(WillMsg),
I -> ensure_timer(will_timer, timer:seconds(I), Channel) Channel#channel{will_msg = undefined};
I ->
ensure_timer(will_timer, timer:seconds(I), Channel)
end. end.
will_delay_interval(WillMsg) -> will_delay_interval(WillMsg) ->
maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0).
publish_will_msg(Msg) -> emqx_broker:publish(Msg). publish_will_msg(Msg) ->
%% TODO check if we should discard result here
_ = emqx_broker:publish(Msg),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Disconnect Reason %% Disconnect Reason

View File

@ -252,6 +252,7 @@ run_loop(Parent, State = #state{transport = Transport,
exit_on_sock_error(Reason) exit_on_sock_error(Reason)
end. end.
-spec exit_on_sock_error(any()) -> no_return().
exit_on_sock_error(Reason) when Reason =:= einval; exit_on_sock_error(Reason) when Reason =:= einval;
Reason =:= enotconn; Reason =:= enotconn;
Reason =:= closed -> Reason =:= closed ->
@ -343,7 +344,7 @@ handle_msg({'$gen_call', From, Req}, State) ->
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
?LOG(debug, "RECV ~0p", [Data]), ?LOG(debug, "RECV ~0p", [Data]),
Oct = iolist_size(Data), Oct = iolist_size(Data),
emqx_pd:inc_counter(incoming_bytes, Oct), inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct), ok = emqx_metrics:inc('bytes.received', Oct),
parse_incoming(Data, State); parse_incoming(Data, State);
@ -439,11 +440,12 @@ handle_msg(Msg, State) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Terminate %% Terminate
-spec terminate(any(), state()) -> no_return().
terminate(Reason, State = #state{channel = Channel}) -> terminate(Reason, State = #state{channel = Channel}) ->
?LOG(debug, "Terminated due to ~p", [Reason]), ?LOG(debug, "Terminated due to ~p", [Reason]),
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)), emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)),
emqx_channel:terminate(Reason, Channel), emqx_channel:terminate(Reason, Channel),
close_socket(State), _ = close_socket(State),
exit(Reason). exit(Reason).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -611,7 +613,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) -> send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ->
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct), ok = emqx_metrics:inc('bytes.sent', Oct),
emqx_pd:inc_counter(outgoing_bytes, Oct), inc_counter(outgoing_bytes, Oct),
maybe_warn_congestion(Socket, Transport, Channel), maybe_warn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, [nosuspend]) of case Transport:async_send(Socket, IoData, [nosuspend]) of
ok -> ok; ok -> ok;
@ -750,23 +752,25 @@ close_socket(State = #state{transport = Transport, socket = Socket}) ->
-compile({inline, [inc_incoming_stats/1]}). -compile({inline, [inc_incoming_stats/1]}).
inc_incoming_stats(Packet = ?PACKET(Type)) -> inc_incoming_stats(Packet = ?PACKET(Type)) ->
emqx_pd:inc_counter(recv_pkt, 1), inc_counter(recv_pkt, 1),
if case Type =:= ?PUBLISH of
Type == ?PUBLISH -> true ->
emqx_pd:inc_counter(recv_msg, 1), inc_counter(recv_msg, 1),
emqx_pd:inc_counter(incoming_pubs, 1); inc_counter(incoming_pubs, 1);
true -> ok false ->
ok
end, end,
emqx_metrics:inc_recv(Packet). emqx_metrics:inc_recv(Packet).
-compile({inline, [inc_outgoing_stats/1]}). -compile({inline, [inc_outgoing_stats/1]}).
inc_outgoing_stats(Packet = ?PACKET(Type)) -> inc_outgoing_stats(Packet = ?PACKET(Type)) ->
emqx_pd:inc_counter(send_pkt, 1), inc_counter(send_pkt, 1),
if case Type =:= ?PUBLISH of
Type == ?PUBLISH -> true ->
emqx_pd:inc_counter(send_msg, 1), inc_counter(send_msg, 1),
emqx_pd:inc_counter(outgoing_pubs, 1); inc_counter(outgoing_pubs, 1);
true -> ok false ->
ok
end, end,
emqx_metrics:inc_sent(Packet). emqx_metrics:inc_sent(Packet).
@ -795,6 +799,10 @@ stop(Reason, State) ->
stop(Reason, Reply, State) -> stop(Reason, Reply, State) ->
{stop, Reason, Reply, State}. {stop, Reason, Reply, State}.
inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% For CT tests %% For CT tests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -31,6 +31,7 @@
-export([ add/2 -export([ add/2
, add/3 , add/3
, add/4 , add/4
, put/2
, del/2 , del/2
, run/2 , run/2
, run_fold/3 , run_fold/3
@ -111,6 +112,14 @@ add(HookPoint, Action, Priority) when is_integer(Priority) ->
add(HookPoint, Action, Filter, Priority) when is_integer(Priority) -> add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
%% @doc Like add/2, it register a callback, discard 'already_exists' error.
-spec(put(hookpoint(), action() | #callback{}) -> ok).
put(HookPoint, Callback) ->
case add(HookPoint, Callback) of
ok -> ok;
{error, already_exists} -> ok
end.
%% @doc Unregister a callback. %% @doc Unregister a callback.
-spec(del(hookpoint(), function() | {module(), atom()}) -> ok). -spec(del(hookpoint(), function() | {module(), atom()}) -> ok).
del(HookPoint, Action) -> del(HookPoint, Action) ->

View File

@ -124,10 +124,10 @@ restart_listener(tcp, ListenOn, _Options) ->
restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls -> restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls ->
esockd:reopen('mqtt:ssl', ListenOn); esockd:reopen('mqtt:ssl', ListenOn);
restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)), _ = cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)),
start_listener(Proto, ListenOn, Options); start_listener(Proto, ListenOn, Options);
restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)), _ = cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)),
start_listener(Proto, ListenOn, Options); start_listener(Proto, ListenOn, Options);
restart_listener(Proto, ListenOn, _Opts) -> restart_listener(Proto, ListenOn, _Opts) ->
esockd:reopen(Proto, ListenOn). esockd:reopen(Proto, ListenOn).

View File

@ -268,7 +268,7 @@ set_all_log_handlers_level([], _NewLevel, _NewHanlder) ->
ok. ok.
rollback([{ID, Level} | List]) -> rollback([{ID, Level} | List]) ->
set_log_handler_level(ID, Level), _ = set_log_handler_level(ID, Level),
rollback(List); rollback(List);
rollback([]) -> ok. rollback([]) -> ok.

View File

@ -38,8 +38,8 @@
-endif. -endif.
load(Env) -> load(Env) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Env]}), emqx_hooks:put('client.connected', {?MODULE, on_client_connected, [Env]}),
emqx_hooks:add('client.disconnected', {?MODULE, on_client_disconnected, [Env]}). emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, [Env]}).
unload(_Env) -> unload(_Env) ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),

View File

@ -45,9 +45,9 @@
load(RawRules) -> load(RawRules) ->
{PubRules, SubRules} = compile(RawRules), {PubRules, SubRules} = compile(RawRules),
emqx_hooks:add('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:add('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:add('message.publish', {?MODULE, rewrite_publish, [PubRules]}). emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}).
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.

View File

@ -40,11 +40,13 @@
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec start_child(supervisor:child_spec()) -> ok.
start_child(ChildSpec) when is_map(ChildSpec) -> start_child(ChildSpec) when is_map(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec). assert_started(supervisor:start_child(?MODULE, ChildSpec)).
-spec start_child(atom(), atom()) -> ok.
start_child(Mod, Type) when is_atom(Mod) andalso is_atom(Type) -> start_child(Mod, Type) when is_atom(Mod) andalso is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Type))).
-spec(stop_child(any()) -> ok | {error, term()}). -spec(stop_child(any()) -> ok | {error, term()}).
stop_child(ChildId) -> stop_child(ChildId) ->
@ -61,3 +63,12 @@ init([]) ->
ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]), ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]),
{ok, {{one_for_one, 10, 100}, []}}. {ok, {{one_for_one, 10, 100}, []}}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_tarted, _Pid}}) -> ok;
assert_started({error, Reason}) -> erlang:error(Reason).

View File

@ -102,14 +102,14 @@
load(_Env) -> load(_Env) ->
emqx_mod_sup:start_child(?MODULE, worker), emqx_mod_sup:start_child(?MODULE, worker),
emqx:hook('message.publish', {?MODULE, on_message_publish, []}), emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
emqx:hook('message.dropped', {?MODULE, on_message_dropped, []}), emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}),
emqx:hook('message.delivered', {?MODULE, on_message_delivered, []}). emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}).
unload(_Env) -> unload(_Env) ->
emqx:unhook('message.publish', {?MODULE, on_message_publish}), emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
emqx:unhook('message.dropped', {?MODULE, on_message_dropped}), emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}),
emqx:unhook('message.delivered', {?MODULE, on_message_delivered}), emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}),
emqx_mod_sup:stop_child(?MODULE). emqx_mod_sup:stop_child(?MODULE).
description() -> description() ->
@ -118,7 +118,7 @@ description() ->
on_message_publish(#message{topic = Topic, qos = QoS}) -> on_message_publish(#message{topic = Topic, qos = QoS}) ->
case is_registered(Topic) of case is_registered(Topic) of
true -> true ->
inc(Topic, 'messages.in'), try_inc(Topic, 'messages.in'),
case QoS of case QoS of
?QOS_0 -> inc(Topic, 'messages.qos0.in'); ?QOS_0 -> inc(Topic, 'messages.qos0.in');
?QOS_1 -> inc(Topic, 'messages.qos1.in'); ?QOS_1 -> inc(Topic, 'messages.qos1.in');
@ -131,7 +131,7 @@ on_message_publish(#message{topic = Topic, qos = QoS}) ->
on_message_delivered(_, #message{topic = Topic, qos = QoS}) -> on_message_delivered(_, #message{topic = Topic, qos = QoS}) ->
case is_registered(Topic) of case is_registered(Topic) of
true -> true ->
inc(Topic, 'messages.out'), try_inc(Topic, 'messages.out'),
case QoS of case QoS of
?QOS_0 -> inc(Topic, 'messages.qos0.out'); ?QOS_0 -> inc(Topic, 'messages.qos0.out');
?QOS_1 -> inc(Topic, 'messages.qos1.out'); ?QOS_1 -> inc(Topic, 'messages.qos1.out');
@ -155,6 +155,10 @@ start_link() ->
stop() -> stop() ->
gen_server:stop(?MODULE). gen_server:stop(?MODULE).
try_inc(Topic, Metric) ->
_ = inc(Topic, Metric),
ok.
inc(Topic, Metric) -> inc(Topic, Metric) ->
inc(Topic, Metric, 1). inc(Topic, Metric, 1).

View File

@ -77,14 +77,17 @@ unload(ModuleName) ->
unload_module(ModuleName, true) unload_module(ModuleName, true)
end. end.
-spec(reload(module()) -> ok | ignore | {error, any()}).
reload(emqx_mod_acl_internal) -> reload(emqx_mod_acl_internal) ->
Modules = emqx:get_env(modules, []), Modules = emqx:get_env(modules, []),
Env = proplists:get_value(emqx_mod_acl_internal, Modules, undefined), Env = proplists:get_value(emqx_mod_acl_internal, Modules, undefined),
case emqx_mod_acl_internal:reload(Env) of case emqx_mod_acl_internal:reload(Env) of
ok -> ok ->
?LOG(info, "Reload ~s module successfully.", [emqx_mod_acl_internal]); ?LOG(info, "Reload ~s module successfully.", [emqx_mod_acl_internal]),
ok;
{error, Error} -> {error, Error} ->
?LOG(error, "Reload module ~s failed, cannot start for ~0p", [emqx_mod_acl_internal, Error]) ?LOG(error, "Reload module ~s failed, cannot start for ~0p", [emqx_mod_acl_internal, Error]),
{error, Error}
end; end;
reload(_) -> reload(_) ->
ignore. ignore.
@ -125,7 +128,7 @@ load_module(ModuleName, Persistent) ->
case ModuleName:load(Env) of case ModuleName:load(Env) of
ok -> ok ->
ets:insert(?MODULE, {ModuleName, true}), ets:insert(?MODULE, {ModuleName, true}),
write_loaded(Persistent), ok = write_loaded(Persistent),
?LOG(info, "Load ~s module successfully.", [ModuleName]); ?LOG(info, "Load ~s module successfully.", [ModuleName]);
{error, Error} -> {error, Error} ->
?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]), ?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]),
@ -152,7 +155,7 @@ unload_module(ModuleName, Persistent) ->
case ModuleName:unload(Env) of case ModuleName:unload(Env) of
ok -> ok ->
ets:insert(?MODULE, {ModuleName, false}), ets:insert(?MODULE, {ModuleName, false}),
write_loaded(Persistent), ok = write_loaded(Persistent),
?LOG(info, "Unload ~s module successfully.", [ModuleName]); ?LOG(info, "Unload ~s module successfully.", [ModuleName]);
{error, Error} -> {error, Error} ->
?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error]) ?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error])
@ -164,6 +167,6 @@ write_loaded(true) ->
ok -> ok; ok -> ok;
{error, Error} -> {error, Error} ->
?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]), ?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]),
{error, Error} ok
end; end;
write_loaded(false) -> ok. write_loaded(false) -> ok.

View File

@ -59,13 +59,13 @@ init() ->
end. end.
%% @doc Load all plugins when the broker started. %% @doc Load all plugins when the broker started.
-spec(load() -> list() | {error, term()}). -spec(load() -> ok | ignore | {error, term()}).
load() -> load() ->
load_expand_plugins(), load_expand_plugins(),
case emqx:get_env(plugins_loaded_file) of case emqx:get_env(plugins_loaded_file) of
undefined -> ignore; %% No plugins available undefined -> ignore; %% No plugins available
File -> File ->
ensure_file(File), _ = ensure_file(File),
with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end) with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end)
end. end.
@ -103,7 +103,7 @@ unload(PluginName) when is_atom(PluginName) ->
?LOG(error, "Plugin ~s is not started", [PluginName]), ?LOG(error, "Plugin ~s is not started", [PluginName]),
{error, not_started}; {error, not_started};
{_, _} -> {_, _} ->
unload_plugin(PluginName, true) unload_plugin(PluginName, true)
end. end.
reload(PluginName) when is_atom(PluginName)-> reload(PluginName) when is_atom(PluginName)->
@ -165,7 +165,7 @@ load_expand_plugins() ->
load_expand_plugin(PluginDir) -> load_expand_plugin(PluginDir) ->
init_expand_plugin_config(PluginDir), init_expand_plugin_config(PluginDir),
Ebin = filename:join([PluginDir, "ebin"]), Ebin = filename:join([PluginDir, "ebin"]),
code:add_patha(Ebin), _ = code:add_patha(Ebin),
Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])), Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
lists:foreach(fun(Mod) -> lists:foreach(fun(Mod) ->
Module = list_to_atom(filename:basename(Mod, ".beam")), Module = list_to_atom(filename:basename(Mod, ".beam")),
@ -246,7 +246,7 @@ apply_configs([{App, Config} | More]) ->
%% Stop plugins %% Stop plugins
stop_plugins(Names) -> stop_plugins(Names) ->
[stop_app(App) || App <- Names], _ = [stop_app(App) || App <- Names],
ok. ok.
plugin(AppName, Type) -> plugin(AppName, Type) ->
@ -287,7 +287,7 @@ start_app(App, SuccFun) ->
{ok, Started} -> {ok, Started} ->
?LOG(info, "Started plugins: ~p", [Started]), ?LOG(info, "Started plugins: ~p", [Started]),
?LOG(info, "Load plugin ~s successfully", [App]), ?LOG(info, "Load plugin ~s successfully", [App]),
SuccFun(App), _ = SuccFun(App),
ok; ok;
{error, {ErrApp, Reason}} -> {error, {ErrApp, Reason}} ->
?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~0p", [App, ErrApp, Reason]), ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~0p", [App, ErrApp, Reason]),
@ -297,7 +297,7 @@ start_app(App, SuccFun) ->
unload_plugin(App, Persistent) -> unload_plugin(App, Persistent) ->
case stop_app(App) of case stop_app(App) of
ok -> ok ->
plugin_unloaded(App, Persistent), ok; _ = plugin_unloaded(App, Persistent), ok;
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end. end.

View File

@ -151,16 +151,16 @@ handle_cast(Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
publish(uptime, iolist_to_binary(uptime(State))), publish_any(uptime, iolist_to_binary(uptime(State))),
publish(datetime, iolist_to_binary(datetime())), publish_any(datetime, iolist_to_binary(datetime())),
{noreply, heartbeat(State)}; {noreply, heartbeat(State)};
handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Version, sysdescr = Descr}) -> handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Version, sysdescr = Descr}) ->
publish(version, Version), publish_any(version, Version),
publish(sysdescr, Descr), publish_any(sysdescr, Descr),
publish(brokers, ekka_mnesia:running_nodes()), publish_any(brokers, ekka_mnesia:running_nodes()),
publish(stats, emqx_stats:getstats()), publish_any(stats, emqx_stats:getstats()),
publish(metrics, emqx_metrics:all()), publish_any(metrics, emqx_metrics:all()),
{noreply, tick(State), hibernate}; {noreply, tick(State), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
@ -192,6 +192,10 @@ uptime(hours, H) ->
uptime(days, D) -> uptime(days, D) ->
[integer_to_list(D), " days, "]. [integer_to_list(D), " days, "].
publish_any(Name, Value) ->
_ = publish(Name, Value),
ok.
publish(uptime, Uptime) -> publish(uptime, Uptime) ->
safe_publish(systop(uptime), Uptime); safe_publish(systop(uptime), Uptime);
publish(datetime, Datetime) -> publish(datetime, Datetime) ->

View File

@ -55,7 +55,7 @@ start_link(Opts) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Opts]) -> init([Opts]) ->
erlang:system_monitor(self(), parse_opt(Opts)), _ = erlang:system_monitor(self(), parse_opt(Opts)),
emqx_logger:set_proc_metadata(#{sysmon => true}), emqx_logger:set_proc_metadata(#{sysmon => true}),
%% Monitor cluster partition event %% Monitor cluster partition event
@ -174,7 +174,7 @@ suppress(Key, SuccFun, State = #{events := Events}) ->
true -> true ->
{noreply, State}; {noreply, State};
false -> false ->
SuccFun(), _ = SuccFun(),
{noreply, State#{events := [Key|Events]}} {noreply, State#{events := [Key|Events]}}
end. end.

View File

@ -113,18 +113,18 @@
peername := peername(), peername := peername(),
peercert := nossl | undefined | esockd_peercert:peercert(), peercert := nossl | undefined | esockd_peercert:peercert(),
conn_mod := module(), conn_mod := module(),
proto_name := binary(), proto_name => binary(),
proto_ver := ver(), proto_ver => ver(),
clean_start := boolean(), clean_start => boolean(),
clientid := clientid(), clientid => clientid(),
username := username(), username => username(),
conn_props := properties(), conn_props => properties(),
connected := boolean(), connected => boolean(),
connected_at := non_neg_integer(), connected_at => non_neg_integer(),
disconnected_at => non_neg_integer(), disconnected_at => non_neg_integer(),
keepalive := 0..16#FFFF, keepalive => 0..16#FFFF,
receive_maximum := non_neg_integer(), receive_maximum => non_neg_integer(),
expiry_interval := non_neg_integer(), expiry_interval => non_neg_integer(),
atom() => term() atom() => term()
}). }).
-type(clientinfo() :: #{zone := zone(), -type(clientinfo() :: #{zone := zone(),
@ -148,7 +148,8 @@
-type(username() :: maybe(binary())). -type(username() :: maybe(binary())).
-type(password() :: maybe(binary())). -type(password() :: maybe(binary())).
-type(peerhost() :: inet:ip_address()). -type(peerhost() :: inet:ip_address()).
-type(peername() :: {inet:ip_address(), inet:port_number()}). -type(peername() :: {inet:ip_address(), inet:port_number()}
| inet:returned_non_ip_address()).
-type(protocol() :: mqtt | 'mqtt-sn' | coap | lwm2m | stomp | none | atom()). -type(protocol() :: mqtt | 'mqtt-sn' | coap | lwm2m | stomp | none | atom()).
-type(auth_result() :: success -type(auth_result() :: success
| client_identifier_not_valid | client_identifier_not_valid

View File

@ -63,7 +63,7 @@
%% Simulate the active_n opt %% Simulate the active_n opt
active_n :: pos_integer(), active_n :: pos_integer(),
%% MQTT Piggyback %% MQTT Piggyback
mqtt_piggyback :: single | multiple, mqtt_piggyback :: single | multiple,
%% Limiter %% Limiter
limiter :: maybe(emqx_limiter:limiter()), limiter :: maybe(emqx_limiter:limiter()),
%% Limit Timer %% Limit Timer
@ -535,7 +535,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT
postpone({check_gc, Stats}, State); postpone({check_gc, Stats}, State);
false -> State false -> State
end, end,
{case MQTTPiggyback of {case MQTTPiggyback of
single -> [{binary, IoData}]; single -> [{binary, IoData}];
multiple -> lists:map(fun(Bin) -> {binary, Bin} end, IoData) multiple -> lists:map(fun(Bin) -> {binary, Bin} end, IoData)
@ -568,35 +568,38 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
]}). ]}).
inc_recv_stats(Cnt, Oct) -> inc_recv_stats(Cnt, Oct) ->
emqx_pd:inc_counter(incoming_bytes, Oct), inc_counter(incoming_bytes, Oct),
emqx_pd:inc_counter(recv_cnt, Cnt), inc_counter(recv_cnt, Cnt),
emqx_pd:inc_counter(recv_oct, Oct), inc_counter(recv_oct, Oct),
emqx_metrics:inc('bytes.received', Oct). emqx_metrics:inc('bytes.received', Oct).
inc_incoming_stats(Packet = ?PACKET(Type)) -> inc_incoming_stats(Packet = ?PACKET(Type)) ->
emqx_pd:inc_counter(recv_pkt, 1), _ = emqx_pd:inc_counter(recv_pkt, 1),
if Type == ?PUBLISH -> if Type == ?PUBLISH ->
emqx_pd:inc_counter(recv_msg, 1), inc_counter(recv_msg, 1),
emqx_pd:inc_counter(incoming_pubs, 1); inc_counter(incoming_pubs, 1);
true -> ok true -> ok
end, end,
emqx_metrics:inc_recv(Packet). emqx_metrics:inc_recv(Packet).
inc_outgoing_stats(Packet = ?PACKET(Type)) -> inc_outgoing_stats(Packet = ?PACKET(Type)) ->
emqx_pd:inc_counter(send_pkt, 1), _ = emqx_pd:inc_counter(send_pkt, 1),
if Type == ?PUBLISH -> if Type == ?PUBLISH ->
emqx_pd:inc_counter(send_msg, 1), inc_counter(send_msg, 1),
emqx_pd:inc_counter(outgoing_pubs, 1); inc_counter(outgoing_pubs, 1);
true -> ok true -> ok
end, end,
emqx_metrics:inc_sent(Packet). emqx_metrics:inc_sent(Packet).
inc_sent_stats(Cnt, Oct) -> inc_sent_stats(Cnt, Oct) ->
emqx_pd:inc_counter(outgoing_bytes, Oct), inc_counter(outgoing_bytes, Oct),
emqx_pd:inc_counter(send_cnt, Cnt), inc_counter(send_cnt, Cnt),
emqx_pd:inc_counter(send_oct, Oct), inc_counter(send_oct, Oct),
emqx_metrics:inc('bytes.sent', Oct). emqx_metrics:inc('bytes.sent', Oct).
inc_counter(Name, Value) ->
_ = emqx_pd:inc_counter(Name, Value),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------