commit
40963221f8
|
@ -71,7 +71,9 @@
|
||||||
%% Message Payload
|
%% Message Payload
|
||||||
payload :: emqx_types:payload(),
|
payload :: emqx_types:payload(),
|
||||||
%% Timestamp (Unit: millisecond)
|
%% Timestamp (Unit: millisecond)
|
||||||
timestamp :: integer()
|
timestamp :: integer(),
|
||||||
|
%% not used so far, for future extension
|
||||||
|
extra = [] :: term()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-record(delivery, {
|
-record(delivery, {
|
||||||
|
|
|
@ -41,6 +41,7 @@
|
||||||
|
|
||||||
-define(LOG(Level, Format), ?LOG(Level, Format, [])).
|
-define(LOG(Level, Format), ?LOG(Level, Format, [])).
|
||||||
|
|
||||||
|
%% deprecated
|
||||||
-define(LOG(Level, Format, Args, Meta),
|
-define(LOG(Level, Format, Args, Meta),
|
||||||
%% check 'allow' here so we do not have to pass an anonymous function
|
%% check 'allow' here so we do not have to pass an anonymous function
|
||||||
%% down to logger which may cause `badfun` exception during upgrade
|
%% down to logger which may cause `badfun` exception during upgrade
|
||||||
|
@ -58,8 +59,15 @@
|
||||||
|
|
||||||
%% structured logging
|
%% structured logging
|
||||||
-define(SLOG(Level, Data),
|
-define(SLOG(Level, Data),
|
||||||
logger:log(Level, Data, #{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}
|
%% check 'allow' here, only evaluate Data when necessary
|
||||||
, line => ?LINE})).
|
case logger:allow(Level, ?MODULE) of
|
||||||
|
true ->
|
||||||
|
logger:log(Level, (Data), #{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}
|
||||||
|
, line => ?LINE
|
||||||
|
});
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end).
|
||||||
|
|
||||||
%% print to 'user' group leader
|
%% print to 'user' group leader
|
||||||
-define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).
|
-define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).
|
||||||
|
|
|
@ -239,17 +239,11 @@ handle_call({get_alarms, deactivated}, _From, State) ->
|
||||||
{reply, Alarms, State};
|
{reply, Alarms, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
msg => "unexpected_call",
|
|
||||||
call => Req
|
|
||||||
}),
|
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
msg => "unexpected_msg",
|
|
||||||
payload => Msg
|
|
||||||
}),
|
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, _TRef, delete_expired_deactivated_alarm},
|
handle_info({timeout, _TRef, delete_expired_deactivated_alarm},
|
||||||
|
@ -259,10 +253,7 @@ handle_info({timeout, _TRef, delete_expired_deactivated_alarm},
|
||||||
{noreply, State#state{timer = ensure_timer(TRef, Period)}};
|
{noreply, State#state{timer = ensure_timer(TRef, Period)}};
|
||||||
|
|
||||||
handle_info({update_timer, Period}, #state{timer = TRef} = State) ->
|
handle_info({update_timer, Period}, #state{timer = TRef} = State) ->
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => "validity_timer_updated", period => Period}),
|
||||||
msg => "update_the_validity_period_timer",
|
|
||||||
period => Period
|
|
||||||
}),
|
|
||||||
{noreply, State#state{timer = ensure_timer(TRef, Period)}};
|
{noreply, State#state{timer = ensure_timer(TRef, Period)}};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
|
@ -311,7 +311,7 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M
|
||||||
catch
|
catch
|
||||||
Class:Reason:Stacktrace ->
|
Class:Reason:Stacktrace ->
|
||||||
?SLOG(warning, #{msg => "unexpected_error_in_authentication",
|
?SLOG(warning, #{msg => "unexpected_error_in_authentication",
|
||||||
class => Class,
|
exception => Class,
|
||||||
reason => Reason,
|
reason => Reason,
|
||||||
stacktrace => Stacktrace,
|
stacktrace => Stacktrace,
|
||||||
authenticator => ID}),
|
authenticator => ID}),
|
||||||
|
@ -652,11 +652,11 @@ handle_call({list_users, ChainName, AuthenticatorID}, _From, State) ->
|
||||||
reply(Reply, State);
|
reply(Reply, State);
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Req, State) ->
|
handle_cast(Req, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Req}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
|
@ -187,14 +187,11 @@ init([]) ->
|
||||||
{ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
|
{ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "unexpected_msg", cast => Msg}),
|
||||||
msg => "unexpected_msg",
|
|
||||||
payload => Msg
|
|
||||||
}),
|
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
|
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
|
||||||
|
|
|
@ -202,10 +202,8 @@ publish(Msg) when is_record(Msg, message) ->
|
||||||
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
||||||
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
||||||
#message{headers = #{allow_publish := false}} ->
|
#message{headers = #{allow_publish := false}} ->
|
||||||
?SLOG(notice, #{
|
?SLOG(debug, #{msg => "message_not_published",
|
||||||
msg => "stop_publishing",
|
payload => emqx_message:to_log_map(Msg)}),
|
||||||
payload => emqx_message:format(Msg)
|
|
||||||
}),
|
|
||||||
[];
|
[];
|
||||||
Msg1 = #message{topic = Topic} ->
|
Msg1 = #message{topic = Topic} ->
|
||||||
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
|
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
|
||||||
|
@ -217,11 +215,12 @@ safe_publish(Msg) when is_record(Msg, message) ->
|
||||||
try
|
try
|
||||||
publish(Msg)
|
publish(Msg)
|
||||||
catch
|
catch
|
||||||
_:Error:Stk->
|
Error : Reason : Stk->
|
||||||
?SLOG(error,#{
|
?SLOG(error,#{
|
||||||
msg => "publishing_error",
|
msg => "publishing_error",
|
||||||
error => Error,
|
exception => Error,
|
||||||
payload => Msg,
|
reason => Reason,
|
||||||
|
payload => emqx_message:to_log_map(Msg),
|
||||||
stacktrace => Stk
|
stacktrace => Stk
|
||||||
}),
|
}),
|
||||||
[]
|
[]
|
||||||
|
@ -465,17 +464,14 @@ handle_call({subscribe, Topic, I}, _From, State) ->
|
||||||
{reply, Ok, State};
|
{reply, Ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({subscribe, Topic}, State) ->
|
handle_cast({subscribe, Topic}, State) ->
|
||||||
case emqx_router:do_add_route(Topic) of
|
case emqx_router:do_add_route(Topic) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "failed_to_add_route", reason => Reason})
|
||||||
msg => "failed_to_add_route",
|
|
||||||
reason => Reason
|
|
||||||
})
|
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
@ -499,7 +495,7 @@ handle_cast({unsubscribed, Topic, I}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
|
@ -118,7 +118,7 @@ init([]) ->
|
||||||
{ok, #{pmon => emqx_pmon:new()}}.
|
{ok, #{pmon => emqx_pmon:new()}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
|
handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
|
||||||
|
@ -127,7 +127,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
|
||||||
{noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}};
|
{noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
|
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
|
||||||
|
|
|
@ -373,17 +373,11 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
|
||||||
ok = after_message_acked(ClientInfo, Msg, Properties),
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
||||||
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}),
|
||||||
msg => "puback_packetId_inuse",
|
|
||||||
packetId => PacketId
|
|
||||||
}),
|
|
||||||
ok = emqx_metrics:inc('packets.puback.inuse'),
|
ok = emqx_metrics:inc('packets.puback.inuse'),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => "puback_packetId_not_found", packetId => PacketId}),
|
||||||
msg => "puback_packetId_not_found",
|
|
||||||
packetId => PacketId
|
|
||||||
}),
|
|
||||||
ok = emqx_metrics:inc('packets.puback.missed'),
|
ok = emqx_metrics:inc('packets.puback.missed'),
|
||||||
{ok, Channel}
|
{ok, Channel}
|
||||||
end;
|
end;
|
||||||
|
@ -507,17 +501,11 @@ handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState})
|
||||||
handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
|
handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
|
||||||
|
|
||||||
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
|
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}),
|
||||||
msg => "malformed_mqtt_message",
|
|
||||||
reason => Reason
|
|
||||||
}),
|
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_in(Packet, Channel) ->
|
handle_in(Packet, Channel) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}),
|
||||||
msg => "disconnecting_due_to_unexpected_message",
|
|
||||||
packet => Packet
|
|
||||||
}),
|
|
||||||
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
|
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -541,10 +529,7 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo,
|
||||||
{error, client_id_unavailable} ->
|
{error, client_id_unavailable} ->
|
||||||
handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
|
handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "failed_to_open_session", reason => Reason}),
|
||||||
msg => "failed_to_open_session",
|
|
||||||
reason => Reason
|
|
||||||
}),
|
|
||||||
handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel)
|
handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -995,7 +980,7 @@ handle_call({quota, Policy}, Channel) ->
|
||||||
reply(ok, Channel#channel{quota = Quota});
|
reply(ok, Channel#channel{quota = Quota});
|
||||||
|
|
||||||
handle_call(Req, Channel) ->
|
handle_call(Req, Channel) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
reply(ignored, Channel).
|
reply(ignored, Channel).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -1035,10 +1020,7 @@ handle_info({sock_closed, Reason}, Channel =
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "unexpected_sock_close", reason => Reason}),
|
||||||
msg => "unexpected_sock_closed",
|
|
||||||
reason => Reason
|
|
||||||
}),
|
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(clean_authz_cache, Channel) ->
|
handle_info(clean_authz_cache, Channel) ->
|
||||||
|
@ -1109,10 +1091,7 @@ handle_timeout(_TRef, expire_quota_limit, Channel) ->
|
||||||
{ok, clean_timer(quota_timer, Channel)};
|
{ok, clean_timer(quota_timer, Channel)};
|
||||||
|
|
||||||
handle_timeout(_TRef, Msg, Channel) ->
|
handle_timeout(_TRef, Msg, Channel) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "unexpected_timeout", timeout_message => Msg}),
|
||||||
msg => "unexpected_timeout",
|
|
||||||
payload => Msg
|
|
||||||
}),
|
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -266,9 +266,8 @@ get_mqtt_conf(Zone, Key) ->
|
||||||
emqx_config:get_zone_conf(Zone, [mqtt, Key]).
|
emqx_config:get_zone_conf(Zone, [mqtt, Key]).
|
||||||
|
|
||||||
%% @doc Try to takeover a session.
|
%% @doc Try to takeover a session.
|
||||||
-spec(takeover_session(emqx_types:clientid())
|
-spec(takeover_session(emqx_types:clientid()) ->
|
||||||
-> {error, term()}
|
{error, term()} | {ok, atom(), pid(), emqx_session:session()}).
|
||||||
| {ok, atom(), pid(), emqx_session:session()}).
|
|
||||||
takeover_session(ClientId) ->
|
takeover_session(ClientId) ->
|
||||||
case lookup_channels(ClientId) of
|
case lookup_channels(ClientId) of
|
||||||
[] -> {error, not_found};
|
[] -> {error, not_found};
|
||||||
|
@ -276,10 +275,7 @@ takeover_session(ClientId) ->
|
||||||
takeover_session(ClientId, ChanPid);
|
takeover_session(ClientId, ChanPid);
|
||||||
ChanPids ->
|
ChanPids ->
|
||||||
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
||||||
?SLOG(error, #{
|
?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}),
|
||||||
msg => "more_than_one_channel_found",
|
|
||||||
chan_pids => ChanPids
|
|
||||||
}),
|
|
||||||
lists:foreach(fun(StalePid) ->
|
lists:foreach(fun(StalePid) ->
|
||||||
catch discard_session(ClientId, StalePid)
|
catch discard_session(ClientId, StalePid)
|
||||||
end, StalePids),
|
end, StalePids),
|
||||||
|
@ -344,10 +340,7 @@ kick_session(ClientId) ->
|
||||||
kick_session(ClientId, ChanPid);
|
kick_session(ClientId, ChanPid);
|
||||||
ChanPids ->
|
ChanPids ->
|
||||||
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
||||||
?SLOG(error, #{
|
?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}),
|
||||||
msg => "more_than_one_channel_found",
|
|
||||||
chan_pids => ChanPids
|
|
||||||
}),
|
|
||||||
lists:foreach(fun(StalePid) ->
|
lists:foreach(fun(StalePid) ->
|
||||||
catch discard_session(ClientId, StalePid)
|
catch discard_session(ClientId, StalePid)
|
||||||
end, StalePids),
|
end, StalePids),
|
||||||
|
@ -422,7 +415,7 @@ init([]) ->
|
||||||
{ok, #{chan_pmon => emqx_pmon:new()}}.
|
{ok, #{chan_pmon => emqx_pmon:new()}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
|
handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
|
||||||
|
@ -430,7 +423,7 @@ handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
|
||||||
{noreply, State#{chan_pmon := PMon1}};
|
{noreply, State#{chan_pmon := PMon1}};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
||||||
|
|
|
@ -114,11 +114,11 @@ init([]) ->
|
||||||
{ok, #{}}.
|
{ok, #{}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({membership, {mnesia, down, Node}}, State) ->
|
handle_info({membership, {mnesia, down, Node}}, State) ->
|
||||||
|
|
|
@ -119,9 +119,9 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
|
||||||
catch Error:Reason:ST ->
|
catch Error:Reason:ST ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "change_config_failed",
|
msg => "change_config_failed",
|
||||||
error => Error,
|
exception => Error,
|
||||||
reason => Reason,
|
reason => Reason,
|
||||||
st => ST
|
stacktrace => ST
|
||||||
}),
|
}),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -417,20 +417,14 @@ handle_msg({'$gen_cast', Req}, State) ->
|
||||||
{ok, NewState};
|
{ok, NewState};
|
||||||
|
|
||||||
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{msg => "RECV_data", data => Data, transport => Inet}),
|
||||||
msg => "RECV_data",
|
|
||||||
data => Data
|
|
||||||
}),
|
|
||||||
Oct = iolist_size(Data),
|
Oct = iolist_size(Data),
|
||||||
inc_counter(incoming_bytes, Oct),
|
inc_counter(incoming_bytes, Oct),
|
||||||
ok = emqx_metrics:inc('bytes.received', Oct),
|
ok = emqx_metrics:inc('bytes.received', Oct),
|
||||||
parse_incoming(Data, State);
|
parse_incoming(Data, State);
|
||||||
|
|
||||||
handle_msg({quic, Data, _Sock, _, _, _}, State) ->
|
handle_msg({quic, Data, _Sock, _, _, _}, State) ->
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{msg => "RECV_data", data => Data, transport => quic}),
|
||||||
msg => "RECV_data",
|
|
||||||
data => Data
|
|
||||||
}),
|
|
||||||
Oct = iolist_size(Data),
|
Oct = iolist_size(Data),
|
||||||
inc_counter(incoming_bytes, Oct),
|
inc_counter(incoming_bytes, Oct),
|
||||||
ok = emqx_metrics:inc('bytes.received', Oct),
|
ok = emqx_metrics:inc('bytes.received', Oct),
|
||||||
|
@ -495,7 +489,7 @@ handle_msg({connack, ConnAck}, State) ->
|
||||||
handle_outgoing(ConnAck, State);
|
handle_outgoing(ConnAck, State);
|
||||||
|
|
||||||
handle_msg({close, Reason}, State) ->
|
handle_msg({close, Reason}, State) ->
|
||||||
?SLOG(debug, #{msg => "force_to_close_the_socket", reason => Reason}),
|
?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
|
|
||||||
handle_msg({event, connected}, State = #state{channel = Channel}) ->
|
handle_msg({event, connected}, State = #state{channel = Channel}) ->
|
||||||
|
@ -661,7 +655,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
||||||
?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState)
|
?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState)
|
||||||
, input_bytes => Data
|
, input_bytes => Data
|
||||||
, parsed_packets => Packets
|
, parsed_packets => Packets
|
||||||
, exception => Reason
|
, reason => Reason
|
||||||
, stacktrace => Stacktrace
|
, stacktrace => Stacktrace
|
||||||
}),
|
}),
|
||||||
{[{frame_error, Reason} | Packets], State}
|
{[{frame_error, Reason} | Packets], State}
|
||||||
|
@ -678,10 +672,7 @@ next_incoming_msgs(Packets) ->
|
||||||
|
|
||||||
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
|
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
|
||||||
ok = inc_incoming_stats(Packet),
|
ok = inc_incoming_stats(Packet),
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{msg => "RECV_packet", packet => Packet}),
|
||||||
msg => "RECV_packet",
|
|
||||||
packet => Packet
|
|
||||||
}),
|
|
||||||
with_channel(handle_in, [Packet], State);
|
with_channel(handle_in, [Packet], State);
|
||||||
|
|
||||||
handle_incoming(FrameError, State) ->
|
handle_incoming(FrameError, State) ->
|
||||||
|
@ -718,7 +709,8 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
||||||
fun(Packet) ->
|
fun(Packet) ->
|
||||||
try emqx_frame:serialize_pkt(Packet, Serialize) of
|
try emqx_frame:serialize_pkt(Packet, Serialize) of
|
||||||
<<>> -> ?SLOG(warning, #{
|
<<>> -> ?SLOG(warning, #{
|
||||||
msg => "packet_is_discarded_because_the frame_is_too_large",
|
msg => "packet_is_discarded",
|
||||||
|
reason => "frame_is_too_large",
|
||||||
packet => emqx_packet:format(Packet)
|
packet => emqx_packet:format(Packet)
|
||||||
}),
|
}),
|
||||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||||
|
|
|
@ -185,13 +185,13 @@ handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq})
|
||||||
case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of
|
case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of
|
||||||
[] -> ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts});
|
[] -> ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts});
|
||||||
[[OriginSeq] | _] ->
|
[[OriginSeq] | _] ->
|
||||||
?SLOG(warning, #{msg => "CMD is overidden", cmd => Cmd, mf => MF}),
|
?SLOG(warning, #{msg => "CMD_overidden", cmd => Cmd, mf => MF}),
|
||||||
true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts})
|
true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts})
|
||||||
end,
|
end,
|
||||||
{reply, ok, next_seq(State)};
|
{reply, ok, next_seq(State)};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({unregister_command, Cmd}, State) ->
|
handle_cast({unregister_command, Cmd}, State) ->
|
||||||
|
@ -199,7 +199,7 @@ handle_cast({unregister_command, Cmd}, State) ->
|
||||||
noreply(State);
|
noreply(State);
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
noreply(State).
|
noreply(State).
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
|
@ -106,7 +106,7 @@ init([]) ->
|
||||||
{ok, #{}, hibernate}.
|
{ok, #{}, hibernate}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({detected, #flapping{clientid = ClientId,
|
handle_cast({detected, #flapping{clientid = ClientId,
|
||||||
|
@ -116,10 +116,10 @@ handle_cast({detected, #flapping{clientid = ClientId,
|
||||||
#{window_time := WindTime, ban_time := Interval}}, State) ->
|
#{window_time := WindTime, ban_time := Interval}}, State) ->
|
||||||
case now_diff(StartedAt) < WindTime of
|
case now_diff(StartedAt) < WindTime of
|
||||||
true -> %% Flapping happened:(
|
true -> %% Flapping happened:(
|
||||||
?SLOG(error, #{
|
?SLOG(warning, #{
|
||||||
msg => "flapping_detected",
|
msg => "flapping_detected",
|
||||||
client_id => ClientId,
|
client_id => ClientId,
|
||||||
peer_host => inet:ntoa(PeerHost),
|
peer_host => fmt_host(PeerHost),
|
||||||
detect_cnt => DetectCnt,
|
detect_cnt => DetectCnt,
|
||||||
wind_time_in_ms => WindTime
|
wind_time_in_ms => WindTime
|
||||||
}),
|
}),
|
||||||
|
@ -134,7 +134,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "client_disconnected",
|
msg => "client_disconnected",
|
||||||
client_id => ClientId,
|
client_id => ClientId,
|
||||||
peer_host => inet:ntoa(PeerHost),
|
peer_host => fmt_host(PeerHost),
|
||||||
detect_cnt => DetectCnt,
|
detect_cnt => DetectCnt,
|
||||||
interval => Interval
|
interval => Interval
|
||||||
})
|
})
|
||||||
|
@ -142,7 +142,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
|
handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
|
||||||
|
@ -171,3 +171,8 @@ start_timers() ->
|
||||||
lists:foreach(fun({Zone, _ZoneConf}) ->
|
lists:foreach(fun({Zone, _ZoneConf}) ->
|
||||||
start_timer(Zone)
|
start_timer(Zone)
|
||||||
end, maps:to_list(emqx:get_config([zones], #{}))).
|
end, maps:to_list(emqx:get_config([zones], #{}))).
|
||||||
|
|
||||||
|
fmt_host(PeerHost) ->
|
||||||
|
try inet:ntoa(PeerHost)
|
||||||
|
catch _:_ -> PeerHost
|
||||||
|
end.
|
||||||
|
|
|
@ -208,10 +208,11 @@ safe_execute({M, F, A}, Args) ->
|
||||||
Error:Reason:Stacktrace ->
|
Error:Reason:Stacktrace ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "failed_to_execute",
|
msg => "failed_to_execute",
|
||||||
module_function_arity => {M, F, A},
|
exception => Error,
|
||||||
error_reason_stacktrace => {Error, Reason, Stacktrace}
|
reason => Reason,
|
||||||
}),
|
stacktrace => Stacktrace,
|
||||||
ok
|
failed_call => {M, F, A}
|
||||||
|
})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc execute a function.
|
%% @doc execute a function.
|
||||||
|
|
|
@ -66,6 +66,7 @@
|
||||||
|
|
||||||
-export([ to_packet/2
|
-export([ to_packet/2
|
||||||
, to_map/1
|
, to_map/1
|
||||||
|
, to_log_map/1
|
||||||
, to_list/1
|
, to_list/1
|
||||||
, from_map/1
|
, from_map/1
|
||||||
]).
|
]).
|
||||||
|
@ -79,11 +80,10 @@
|
||||||
headers := emqx_types:headers(),
|
headers := emqx_types:headers(),
|
||||||
topic := emqx_types:topic(),
|
topic := emqx_types:topic(),
|
||||||
payload := emqx_types:payload(),
|
payload := emqx_types:payload(),
|
||||||
timestamp := integer()}
|
timestamp := integer(),
|
||||||
|
extra := _}
|
||||||
).
|
).
|
||||||
|
|
||||||
-export([format/1]).
|
|
||||||
|
|
||||||
-elvis([{elvis_style, god_modules, disable}]).
|
-elvis([{elvis_style, god_modules, disable}]).
|
||||||
|
|
||||||
-spec(make(emqx_types:topic(), emqx_types:payload()) -> emqx_types:message()).
|
-spec(make(emqx_types:topic(), emqx_types:payload()) -> emqx_types:message()).
|
||||||
|
@ -292,7 +292,8 @@ to_map(#message{
|
||||||
headers = Headers,
|
headers = Headers,
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
payload = Payload,
|
payload = Payload,
|
||||||
timestamp = Timestamp
|
timestamp = Timestamp,
|
||||||
|
extra = Extra
|
||||||
}) ->
|
}) ->
|
||||||
#{id => Id,
|
#{id => Id,
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
|
@ -301,9 +302,13 @@ to_map(#message{
|
||||||
headers => Headers,
|
headers => Headers,
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
payload => Payload,
|
payload => Payload,
|
||||||
timestamp => Timestamp
|
timestamp => Timestamp,
|
||||||
|
extra => Extra
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%% @doc To map for logging, with payload dropped.
|
||||||
|
to_log_map(Msg) -> maps:without([payload], to_map(Msg)).
|
||||||
|
|
||||||
%% @doc Message to tuple list
|
%% @doc Message to tuple list
|
||||||
-spec(to_list(emqx_types:message()) -> list()).
|
-spec(to_list(emqx_types:message()) -> list()).
|
||||||
to_list(Msg) ->
|
to_list(Msg) ->
|
||||||
|
@ -318,7 +323,8 @@ from_map(#{id := Id,
|
||||||
headers := Headers,
|
headers := Headers,
|
||||||
topic := Topic,
|
topic := Topic,
|
||||||
payload := Payload,
|
payload := Payload,
|
||||||
timestamp := Timestamp
|
timestamp := Timestamp,
|
||||||
|
extra := Extra
|
||||||
}) ->
|
}) ->
|
||||||
#message{
|
#message{
|
||||||
id = Id,
|
id = Id,
|
||||||
|
@ -328,24 +334,10 @@ from_map(#{id := Id,
|
||||||
headers = Headers,
|
headers = Headers,
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
payload = Payload,
|
payload = Payload,
|
||||||
timestamp = Timestamp
|
timestamp = Timestamp,
|
||||||
|
extra = Extra
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%% MilliSeconds
|
%% MilliSeconds
|
||||||
elapsed(Since) ->
|
elapsed(Since) ->
|
||||||
max(0, erlang:system_time(millisecond) - Since).
|
max(0, erlang:system_time(millisecond) - Since).
|
||||||
|
|
||||||
format(#message{id = Id,
|
|
||||||
qos = QoS,
|
|
||||||
topic = Topic,
|
|
||||||
from = From,
|
|
||||||
flags = Flags,
|
|
||||||
headers = Headers}) ->
|
|
||||||
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)",
|
|
||||||
[Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
|
|
||||||
|
|
||||||
format(flags, Flags) ->
|
|
||||||
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
|
|
||||||
format(headers, Headers) ->
|
|
||||||
io_lib:format("~p", [Headers]).
|
|
||||||
|
|
||||||
|
|
|
@ -87,10 +87,7 @@ handle_call(Req, _From, State) ->
|
||||||
{reply, {error, {unexpected_call, Req}}, State}.
|
{reply, {error, {unexpected_call, Req}}, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "unexpected_cast", cast=> Msg}),
|
||||||
msg => "unexpected_cast_discarded",
|
|
||||||
payload => Msg
|
|
||||||
}),
|
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, _Timer, check}, State) ->
|
handle_info({timeout, _Timer, check}, State) ->
|
||||||
|
@ -112,10 +109,7 @@ handle_info({timeout, _Timer, check}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?SLOG(info, #{
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
msg => "unexpected_info_discarded",
|
|
||||||
info => Info
|
|
||||||
}),
|
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
|
|
|
@ -29,8 +29,6 @@
|
||||||
, find_plugin/1
|
, find_plugin/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([funlog/2]).
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
@ -50,16 +48,14 @@ load() ->
|
||||||
load(PluginName) when is_atom(PluginName) ->
|
load(PluginName) when is_atom(PluginName) ->
|
||||||
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
|
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
|
||||||
{false, _} ->
|
{false, _} ->
|
||||||
?SLOG(alert, #{
|
?SLOG(alert, #{msg => "failed_to_load_plugin",
|
||||||
msg => "plugin_not_found_cannot_load",
|
plugin_name => PluginName,
|
||||||
plugin_name => PluginName
|
reason => not_found}),
|
||||||
}),
|
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
{_, true} ->
|
{_, true} ->
|
||||||
?SLOG(notice, #{
|
?SLOG(notice, #{msg => "plugin_already_loaded",
|
||||||
msg => "plugin_already_started",
|
plugin_name => PluginName,
|
||||||
plugin_name => PluginName
|
reason => already_loaded}),
|
||||||
}),
|
|
||||||
{error, already_started};
|
{error, already_started};
|
||||||
{_, false} ->
|
{_, false} ->
|
||||||
load_plugin(PluginName)
|
load_plugin(PluginName)
|
||||||
|
@ -75,16 +71,14 @@ unload() ->
|
||||||
unload(PluginName) when is_atom(PluginName) ->
|
unload(PluginName) when is_atom(PluginName) ->
|
||||||
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
|
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
|
||||||
{false, _} ->
|
{false, _} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "fialed_to_unload_plugin",
|
||||||
msg => "plugin_not_found_cannot_load",
|
plugin_name => PluginName,
|
||||||
plugin_name => PluginName
|
reason => not_found}),
|
||||||
}),
|
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
{_, false} ->
|
{_, false} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "failed_to_unload_plugin",
|
||||||
msg => "plugin_not_started",
|
plugin_name => PluginName,
|
||||||
plugin_name => PluginName
|
reason => not_loaded}),
|
||||||
}),
|
|
||||||
{error, not_started};
|
{error, not_started};
|
||||||
{_, _} ->
|
{_, _} ->
|
||||||
unload_plugin(PluginName)
|
unload_plugin(PluginName)
|
||||||
|
@ -93,10 +87,9 @@ unload(PluginName) when is_atom(PluginName) ->
|
||||||
reload(PluginName) when is_atom(PluginName)->
|
reload(PluginName) when is_atom(PluginName)->
|
||||||
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
|
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
|
||||||
{false, _} ->
|
{false, _} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "failed_to_reload_plugin",
|
||||||
msg => "plugin_not_found_cannot_load",
|
plugin_name => PluginName,
|
||||||
plugin_name => PluginName
|
reason => not_found}),
|
||||||
}),
|
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
{_, false} ->
|
{_, false} ->
|
||||||
load(PluginName);
|
load(PluginName);
|
||||||
|
@ -142,20 +135,14 @@ load_ext_plugins(Dir) ->
|
||||||
end, filelib:wildcard("*", Dir)).
|
end, filelib:wildcard("*", Dir)).
|
||||||
|
|
||||||
load_ext_plugin(PluginDir) ->
|
load_ext_plugin(PluginDir) ->
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{msg => "loading_extra_plugin", plugin_dir => PluginDir}),
|
||||||
msg => "loading_extra_plugin",
|
|
||||||
plugin_dir => PluginDir
|
|
||||||
}),
|
|
||||||
Ebin = filename:join([PluginDir, "ebin"]),
|
Ebin = filename:join([PluginDir, "ebin"]),
|
||||||
AppFile = filename:join([Ebin, "*.app"]),
|
AppFile = filename:join([Ebin, "*.app"]),
|
||||||
AppName = case filelib:wildcard(AppFile) of
|
AppName = case filelib:wildcard(AppFile) of
|
||||||
[App] ->
|
[App] ->
|
||||||
list_to_atom(filename:basename(App, ".app"));
|
list_to_atom(filename:basename(App, ".app"));
|
||||||
[] ->
|
[] ->
|
||||||
?SLOG(alert, #{
|
?SLOG(alert, #{msg => "plugin_app_file_not_found", app_file => AppFile}),
|
||||||
msg => "plugin_app_file_not_found",
|
|
||||||
app_file => AppFile
|
|
||||||
}),
|
|
||||||
error({plugin_app_file_not_found, AppFile})
|
error({plugin_app_file_not_found, AppFile})
|
||||||
end,
|
end,
|
||||||
ok = load_plugin_app(AppName, Ebin).
|
ok = load_plugin_app(AppName, Ebin).
|
||||||
|
@ -205,12 +192,13 @@ load_plugin(Name) ->
|
||||||
{error, Error0} ->
|
{error, Error0} ->
|
||||||
{error, Error0}
|
{error, Error0}
|
||||||
end
|
end
|
||||||
catch _ : Error : Stacktrace ->
|
catch Error : Reason : Stacktrace ->
|
||||||
?SLOG(alert, #{
|
?SLOG(alert, #{
|
||||||
msg => "plugin_load_failed",
|
msg => "plugin_load_failed",
|
||||||
name => Name,
|
name => Name,
|
||||||
error => Error,
|
exception => Error,
|
||||||
stk => Stacktrace
|
reason => Reason,
|
||||||
|
stacktrace => Stacktrace
|
||||||
}),
|
}),
|
||||||
{error, parse_config_file_failed}
|
{error, parse_config_file_failed}
|
||||||
end.
|
end.
|
||||||
|
@ -228,23 +216,19 @@ load_app(App) ->
|
||||||
start_app(App) ->
|
start_app(App) ->
|
||||||
case application:ensure_all_started(App) of
|
case application:ensure_all_started(App) of
|
||||||
{ok, Started} ->
|
{ok, Started} ->
|
||||||
?SLOG(info, #{
|
case Started =/= [] of
|
||||||
msg => "all_started_plugins",
|
true -> ?SLOG(info, #{msg => "started_plugin_dependency_apps", apps => Started});
|
||||||
started => Started
|
false -> ok
|
||||||
}),
|
end,
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{msg => "started_plugin_app", app => App}),
|
||||||
msg => "load_plugin_app_successfully",
|
|
||||||
app => App
|
|
||||||
}),
|
|
||||||
ok;
|
ok;
|
||||||
{error, {ErrApp, Reason}} ->
|
{error, {ErrApp, Reason}} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => failed_to_start_plugin_app,
|
||||||
msg => "load_plugin_failed_cannot_started",
|
|
||||||
app => App,
|
app => App,
|
||||||
err_app => ErrApp,
|
err_app => ErrApp,
|
||||||
reason => Reason
|
reason => Reason
|
||||||
}),
|
}),
|
||||||
{error, {ErrApp, Reason}}
|
{error, failed_to_start_plugin_app}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
unload_plugin(App) ->
|
unload_plugin(App) ->
|
||||||
|
@ -258,20 +242,13 @@ unload_plugin(App) ->
|
||||||
stop_app(App) ->
|
stop_app(App) ->
|
||||||
case application:stop(App) of
|
case application:stop(App) of
|
||||||
ok ->
|
ok ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{msg => "stop_plugin_successfully", app => App}),
|
||||||
msg => "stop_plugin_successfully",
|
|
||||||
app => App
|
|
||||||
}),
|
|
||||||
ok;
|
ok;
|
||||||
{error, {not_started, App}} ->
|
{error, {not_started, App}} ->
|
||||||
?SLOG(error, #{
|
?SLOG(info, #{msg => "plugin_not_started", app => App}),
|
||||||
msg => "plugin_not_started",
|
|
||||||
app => App
|
|
||||||
}),
|
|
||||||
ok;
|
ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "failed_to_stop_plugin_app",
|
||||||
msg => "stop_plugin",
|
|
||||||
app => App,
|
app => App,
|
||||||
error => Reason
|
error => Reason
|
||||||
}),
|
}),
|
||||||
|
@ -286,9 +263,3 @@ names(started_app) ->
|
||||||
|
|
||||||
names(Plugins) ->
|
names(Plugins) ->
|
||||||
[Name || #plugin{name = Name} <- Plugins].
|
[Name || #plugin{name = Name} <- Plugins].
|
||||||
|
|
||||||
funlog(Key, Value) ->
|
|
||||||
?SLOG(info, #{
|
|
||||||
key => string:join(Key, "."),
|
|
||||||
value => Value
|
|
||||||
}).
|
|
||||||
|
|
|
@ -100,22 +100,22 @@ handle_call({submit, Task}, _From, State) ->
|
||||||
{reply, catch run(Task), State};
|
{reply, catch run(Task), State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({async_submit, Task}, State) ->
|
handle_cast({async_submit, Task}, State) ->
|
||||||
try run(Task)
|
try run(Task)
|
||||||
catch _:Error:Stacktrace ->
|
catch Error:Reason:Stacktrace ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "async_submit_error",
|
||||||
msg => "error",
|
exception => Error,
|
||||||
error => Error,
|
reason => Reason,
|
||||||
stk => Stacktrace
|
stacktrace => Stacktrace
|
||||||
})
|
})
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
|
@ -203,11 +203,11 @@ handle_call({delete_route, Topic, Dest}, _From, State) ->
|
||||||
{reply, Ok, State};
|
{reply, Ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
|
@ -109,11 +109,11 @@ init([]) ->
|
||||||
{ok, #{nodes => Nodes}, hibernate}.
|
{ok, #{nodes => Nodes}, hibernate}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}},
|
handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}},
|
||||||
|
@ -130,10 +130,7 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info({mnesia_table_event, Event}, State) ->
|
handle_info({mnesia_table_event, Event}, State) ->
|
||||||
?SLOG(error,#{
|
?SLOG(error,#{msg => "unexpected_mnesia_table_event", event => Event}),
|
||||||
msg => "unexpected_mnesia_table_event",
|
|
||||||
event => Event
|
|
||||||
}),
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
|
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
|
||||||
|
|
|
@ -479,16 +479,12 @@ log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
|
||||||
case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
|
case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
|
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => "dropped_qos0_msg",
|
||||||
msg => "dropped_qos0_msg",
|
payload => emqx_message:to_log_map(Msg)});
|
||||||
payload => emqx_message:format(Msg)
|
|
||||||
});
|
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => "dropped_msg_due_to_mqueue_is_full",
|
||||||
msg => "dropped_msg_due_to_mqueue_is_full",
|
payload => emqx_message:to_log_map(Msg)})
|
||||||
payload => emqx_message:format(Msg)
|
|
||||||
})
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
enrich_fun(Session = #session{subscriptions = Subs}) ->
|
enrich_fun(Session = #session{subscriptions = Subs}) ->
|
||||||
|
|
|
@ -347,11 +347,8 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P
|
||||||
handle_info({mnesia_table_event, _Event}, State) ->
|
handle_info({mnesia_table_event, _Event}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
|
handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{pmon = PMon}) ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{msg => "shared_subscriber_down", sub_pid => SubPid, reason => Reason}),
|
||||||
msg => "shared_subscriber_down",
|
|
||||||
sub_pid => SubPid
|
|
||||||
}),
|
|
||||||
cleanup_down(SubPid),
|
cleanup_down(SubPid),
|
||||||
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
||||||
|
|
||||||
|
|
|
@ -202,7 +202,7 @@ handle_call(stop, _From, State) ->
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({setstat, Stat, MaxStat, Val}, State) ->
|
handle_cast({setstat, Stat, MaxStat, Val}, State) ->
|
||||||
|
@ -221,8 +221,7 @@ handle_cast({update_interval, Update = #update{name = Name}},
|
||||||
State = #state{updates = Updates}) ->
|
State = #state{updates = Updates}) ->
|
||||||
NState = case lists:keyfind(Name, #update.name, Updates) of
|
NState = case lists:keyfind(Name, #update.name, Updates) of
|
||||||
#update{} ->
|
#update{} ->
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => "duplicated_update",
|
||||||
msg => "duplicated_update",
|
|
||||||
name => Name
|
name => Name
|
||||||
}),
|
}),
|
||||||
State;
|
State;
|
||||||
|
@ -235,7 +234,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) ->
|
||||||
{noreply, State#state{updates = Updates1}};
|
{noreply, State#state{updates = Updates1}};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) ->
|
handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) ->
|
||||||
|
@ -244,11 +243,12 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
|
||||||
func = UpFun}, Acc) when C =< 0 ->
|
func = UpFun}, Acc) when C =< 0 ->
|
||||||
try UpFun()
|
try UpFun()
|
||||||
catch
|
catch
|
||||||
_:Error ->
|
Error : Reason : Stacktrace ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "update_name_failed",
|
||||||
msg => "update_name_failed",
|
|
||||||
name => Name,
|
name => Name,
|
||||||
error => Error
|
exception => Error,
|
||||||
|
reason => Reason,
|
||||||
|
stacktrace => Stacktrace
|
||||||
})
|
})
|
||||||
end,
|
end,
|
||||||
[Update#update{countdown = I} | Acc];
|
[Update#update{countdown = I} | Acc];
|
||||||
|
@ -284,4 +284,3 @@ safe_update_element(Key, Val) ->
|
||||||
val => Val
|
val => Val
|
||||||
})
|
})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -134,11 +134,11 @@ handle_call(uptime, _From, State) ->
|
||||||
{reply, uptime(State), State};
|
{reply, uptime(State), State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
|
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
|
||||||
|
|
|
@ -93,10 +93,10 @@ handle_cast(Msg, State) ->
|
||||||
handle_info({monitor, Pid, long_gc, Info}, State) ->
|
handle_info({monitor, Pid, long_gc, Info}, State) ->
|
||||||
suppress({long_gc, Pid},
|
suppress({long_gc, Pid},
|
||||||
fun() ->
|
fun() ->
|
||||||
WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]),
|
WarnMsg = io_lib:format("long_gc warning: pid = ~p", [Pid]),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => long_gc,
|
||||||
warn_msg => WarnMsg,
|
info => Info,
|
||||||
pid_info => procinfo(Pid)
|
porcinfo => procinfo(Pid)
|
||||||
}),
|
}),
|
||||||
safe_publish(long_gc, WarnMsg)
|
safe_publish(long_gc, WarnMsg)
|
||||||
end, State);
|
end, State);
|
||||||
|
@ -104,33 +104,30 @@ handle_info({monitor, Pid, long_gc, Info}, State) ->
|
||||||
handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
|
handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
|
||||||
suppress({long_schedule, Pid},
|
suppress({long_schedule, Pid},
|
||||||
fun() ->
|
fun() ->
|
||||||
WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
|
WarnMsg = io_lib:format("long_schedule warning: pid = ~p", [Pid]),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => long_schedule,
|
||||||
warn_msg => WarnMsg,
|
info => Info,
|
||||||
pid_info => procinfo(Pid)
|
procinfo => procinfo(Pid)}),
|
||||||
}),
|
|
||||||
safe_publish(long_schedule, WarnMsg)
|
safe_publish(long_schedule, WarnMsg)
|
||||||
end, State);
|
end, State);
|
||||||
|
|
||||||
handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
|
handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
|
||||||
suppress({long_schedule, Port},
|
suppress({long_schedule, Port},
|
||||||
fun() ->
|
fun() ->
|
||||||
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
|
WarnMsg = io_lib:format("long_schedule warning: port = ~p", [Port]),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => long_schedule,
|
||||||
warn_msg => WarnMsg,
|
info => Info,
|
||||||
port_info => erlang:port_info(Port)
|
portinfo => portinfo(Port)}),
|
||||||
}),
|
|
||||||
safe_publish(long_schedule, WarnMsg)
|
safe_publish(long_schedule, WarnMsg)
|
||||||
end, State);
|
end, State);
|
||||||
|
|
||||||
handle_info({monitor, Pid, large_heap, Info}, State) ->
|
handle_info({monitor, Pid, large_heap, Info}, State) ->
|
||||||
suppress({large_heap, Pid},
|
suppress({large_heap, Pid},
|
||||||
fun() ->
|
fun() ->
|
||||||
WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
|
WarnMsg = io_lib:format("large_heap warning: pid = ~p", [Pid]),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => large_heap,
|
||||||
warn_msg => WarnMsg,
|
info => Info,
|
||||||
pid_info => procinfo(Pid)
|
procinfo => procinfo(Pid)}),
|
||||||
}),
|
|
||||||
safe_publish(large_heap, WarnMsg)
|
safe_publish(large_heap, WarnMsg)
|
||||||
end, State);
|
end, State);
|
||||||
|
|
||||||
|
@ -138,10 +135,9 @@ handle_info({monitor, SusPid, busy_port, Port}, State) ->
|
||||||
suppress({busy_port, Port},
|
suppress({busy_port, Port},
|
||||||
fun() ->
|
fun() ->
|
||||||
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
|
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => busy_port,
|
||||||
warn_msg => WarnMsg,
|
portinfo => portinfo(Port),
|
||||||
pid_info => procinfo(SusPid),
|
procinfo => procinfo(SusPid)
|
||||||
port_info => erlang:port_info(Port)
|
|
||||||
}),
|
}),
|
||||||
safe_publish(busy_port, WarnMsg)
|
safe_publish(busy_port, WarnMsg)
|
||||||
end, State);
|
end, State);
|
||||||
|
@ -150,11 +146,9 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
|
||||||
suppress({busy_dist_port, Port},
|
suppress({busy_dist_port, Port},
|
||||||
fun() ->
|
fun() ->
|
||||||
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
|
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{msg => busy_dist_port,
|
||||||
warn_msg => WarnMsg,
|
portinfo => portinfo(Port),
|
||||||
pid_info => procinfo(SusPid),
|
procinfo => procinfo(SusPid)}),
|
||||||
port_info => erlang:port_info(Port)
|
|
||||||
}),
|
|
||||||
safe_publish(busy_dist_port, WarnMsg)
|
safe_publish(busy_dist_port, WarnMsg)
|
||||||
end, State);
|
end, State);
|
||||||
|
|
||||||
|
@ -190,11 +184,14 @@ suppress(Key, SuccFun, State = #{events := Events}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
procinfo(Pid) ->
|
procinfo(Pid) ->
|
||||||
case {emqx_vm:get_process_info(Pid), emqx_vm:get_process_gc_info(Pid)} of
|
[{pid, Pid} | procinfo_l(emqx_vm:get_process_gc_info(Pid))] ++
|
||||||
{undefined, _} -> undefined;
|
procinfo_l(emqx_vm:get_process_info(Pid)).
|
||||||
{_, undefined} -> undefined;
|
|
||||||
{Info, GcInfo} -> Info ++ GcInfo
|
procinfo_l(undefined) -> [];
|
||||||
end.
|
procinfo_l(List) -> List.
|
||||||
|
|
||||||
|
portinfo(Port) ->
|
||||||
|
[{port, Port} | erlang:port_info(Port)].
|
||||||
|
|
||||||
safe_publish(Event, WarnMsg) ->
|
safe_publish(Event, WarnMsg) ->
|
||||||
Topic = emqx_topic:systop(lists:concat(['sysmon/', Event])),
|
Topic = emqx_topic:systop(lists:concat(['sysmon/', Event])),
|
||||||
|
|
|
@ -115,25 +115,18 @@ install_trace_handler(Who, Level, LogFile) ->
|
||||||
{fun filter_by_meta_key/2, Who}}]})
|
{fun filter_by_meta_key/2, Who}}]})
|
||||||
of
|
of
|
||||||
ok ->
|
ok ->
|
||||||
?SLOG(info, #{msg => "start_trace_for", who => Who});
|
?SLOG(info, #{msg => "start_trace", who => Who});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{msg => "start_trace_for_who_failed", who => Who, reason => Reason}),
|
?SLOG(error, #{msg => "failed_to_trace", who => Who, reason => Reason}),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
uninstall_trance_handler(Who) ->
|
uninstall_trance_handler(Who) ->
|
||||||
case logger:remove_handler(handler_id(Who)) of
|
case logger:remove_handler(handler_id(Who)) of
|
||||||
ok ->
|
ok ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{msg => "stop_trace", who => Who});
|
||||||
msg => "stop_trace_for",
|
|
||||||
who => Who
|
|
||||||
});
|
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "failed_to_stop_trace", who => Who, reason => Reason}),
|
||||||
msg => "stop_trace_for",
|
|
||||||
who => Who,
|
|
||||||
reason => Reason
|
|
||||||
}),
|
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -49,17 +49,11 @@ init([]) ->
|
||||||
{ok, #{}}.
|
{ok, #{}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
msg => "[VM_MON]_unexpected_call",
|
|
||||||
req => Req
|
|
||||||
}),
|
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
msg => "[VM_MON]_unexpected_cast",
|
|
||||||
cast => Msg
|
|
||||||
}),
|
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, _Timer, check}, State) ->
|
handle_info({timeout, _Timer, check}, State) ->
|
||||||
|
@ -81,10 +75,7 @@ handle_info({timeout, _Timer, check}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
msg => "[VM_MON]_unexpected_info",
|
|
||||||
info => Info
|
|
||||||
}),
|
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
|
|
|
@ -181,13 +181,11 @@ init(Req, #{listener := {Type, Listener}} = Opts) ->
|
||||||
idle_timeout => get_ws_opts(Type, Listener, idle_timeout)
|
idle_timeout => get_ws_opts(Type, Listener, idle_timeout)
|
||||||
},
|
},
|
||||||
case check_origin_header(Req, Opts) of
|
case check_origin_header(Req, Opts) of
|
||||||
{error, Message} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{msg => "invalid_origin_header", reason => Reason}),
|
||||||
msg => "invalid_origin_header",
|
|
||||||
payload => Message
|
|
||||||
}),
|
|
||||||
{ok, cowboy_req:reply(403, Req), WsOpts};
|
{ok, cowboy_req:reply(403, Req), WsOpts};
|
||||||
ok -> parse_sec_websocket_protocol(Req, Opts, WsOpts)
|
ok ->
|
||||||
|
parse_sec_websocket_protocol(Req, Opts, WsOpts)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_sec_websocket_protocol(Req, #{listener := {Type, Listener}} = Opts, WsOpts) ->
|
parse_sec_websocket_protocol(Req, #{listener := {Type, Listener}} = Opts, WsOpts) ->
|
||||||
|
@ -234,7 +232,7 @@ parse_header_fun_origin(Req, #{listener := {Type, Listener}}) ->
|
||||||
Value ->
|
Value ->
|
||||||
case lists:member(Value, get_ws_opts(Type, Listener, check_origins)) of
|
case lists:member(Value, get_ws_opts(Type, Listener, check_origins)) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> {origin_not_allowed, Value}
|
false -> {error, #{bad_origin => Value}}
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -266,11 +264,11 @@ websocket_init([Req, #{zone := Zone, listener := {Type, Listener}} = Opts]) ->
|
||||||
WsCookie = try cowboy_req:parse_cookies(Req)
|
WsCookie = try cowboy_req:parse_cookies(Req)
|
||||||
catch
|
catch
|
||||||
error:badarg ->
|
error:badarg ->
|
||||||
?SLOG(error, #{msg => "illegal_cookie"}),
|
?SLOG(error, #{msg => "bad_cookie"}),
|
||||||
undefined;
|
undefined;
|
||||||
Error:Reason ->
|
Error:Reason ->
|
||||||
?SLOG(error, #{msg => "failed_to_parse_cookie",
|
?SLOG(error, #{msg => "failed_to_parse_cookie",
|
||||||
error => Error,
|
exception => Error,
|
||||||
reason => Reason}),
|
reason => Reason}),
|
||||||
undefined
|
undefined
|
||||||
end,
|
end,
|
||||||
|
@ -328,7 +326,7 @@ websocket_handle({binary, Data}, State) when is_list(Data) ->
|
||||||
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
||||||
|
|
||||||
websocket_handle({binary, Data}, State) ->
|
websocket_handle({binary, Data}, State) ->
|
||||||
?SLOG(debug, #{msg => "recv_data", data => Data}),
|
?SLOG(debug, #{msg => "RECV_data", data => Data, transport => websocket}),
|
||||||
ok = inc_recv_stats(1, iolist_size(Data)),
|
ok = inc_recv_stats(1, iolist_size(Data)),
|
||||||
NState = ensure_stats_timer(State),
|
NState = ensure_stats_timer(State),
|
||||||
return(parse_incoming(Data, NState));
|
return(parse_incoming(Data, NState));
|
||||||
|
@ -450,7 +448,7 @@ handle_info({connack, ConnAck}, State) ->
|
||||||
return(enqueue(ConnAck, State));
|
return(enqueue(ConnAck, State));
|
||||||
|
|
||||||
handle_info({close, Reason}, State) ->
|
handle_info({close, Reason}, State) ->
|
||||||
?SLOG(debug, #{msg => "force_to_close_the_socket", reason => Reason}),
|
?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
|
||||||
return(enqueue({close, Reason}, State));
|
return(enqueue({close, Reason}, State));
|
||||||
|
|
||||||
handle_info({event, connected}, State = #state{channel = Channel}) ->
|
handle_info({event, connected}, State = #state{channel = Channel}) ->
|
||||||
|
@ -632,10 +630,9 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback,
|
||||||
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
||||||
fun(Packet) ->
|
fun(Packet) ->
|
||||||
try emqx_frame:serialize_pkt(Packet, Serialize) of
|
try emqx_frame:serialize_pkt(Packet, Serialize) of
|
||||||
<<>> -> ?SLOG(warning, #{
|
<<>> -> ?SLOG(warning, #{msg => "packet_discarded",
|
||||||
msg => "packet_is_discarded_due_to_the_frame_is_too_large",
|
reason => "frame_too_large",
|
||||||
packet => emqx_packet:format(Packet)
|
packet => emqx_packet:format(Packet)}),
|
||||||
}),
|
|
||||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||||
ok = emqx_metrics:inc('delivery.dropped'),
|
ok = emqx_metrics:inc('delivery.dropped'),
|
||||||
<<>>;
|
<<>>;
|
||||||
|
|
|
@ -141,13 +141,6 @@ t_undefined_headers(_) ->
|
||||||
Msg2 = emqx_message:set_header(c, 3, Msg),
|
Msg2 = emqx_message:set_header(c, 3, Msg),
|
||||||
?assertEqual(3, emqx_message:get_header(c, Msg2)).
|
?assertEqual(3, emqx_message:get_header(c, Msg2)).
|
||||||
|
|
||||||
t_format(_) ->
|
|
||||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
|
||||||
io:format("~s~n", [emqx_message:format(Msg)]),
|
|
||||||
Msg1 = emqx_message:set_header(properties, #{'Subscription-Identifier' => 1},
|
|
||||||
emqx_message:set_flag(dup, Msg)),
|
|
||||||
io:format("~s~n", [emqx_message:format(Msg1)]).
|
|
||||||
|
|
||||||
t_is_expired(_) ->
|
t_is_expired(_) ->
|
||||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||||
?assertNot(emqx_message:is_expired(Msg)),
|
?assertNot(emqx_message:is_expired(Msg)),
|
||||||
|
@ -206,7 +199,9 @@ t_to_map(_) ->
|
||||||
{headers, #{}},
|
{headers, #{}},
|
||||||
{topic, <<"topic">>},
|
{topic, <<"topic">>},
|
||||||
{payload, <<"payload">>},
|
{payload, <<"payload">>},
|
||||||
{timestamp, emqx_message:timestamp(Msg)}],
|
{timestamp, emqx_message:timestamp(Msg)},
|
||||||
|
{extra, []}
|
||||||
|
],
|
||||||
?assertEqual(List, emqx_message:to_list(Msg)),
|
?assertEqual(List, emqx_message:to_list(Msg)),
|
||||||
?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).
|
?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)).
|
||||||
|
|
||||||
|
@ -219,6 +214,8 @@ t_from_map(_) ->
|
||||||
headers => #{},
|
headers => #{},
|
||||||
topic => <<"topic">>,
|
topic => <<"topic">>,
|
||||||
payload => <<"payload">>,
|
payload => <<"payload">>,
|
||||||
timestamp => emqx_message:timestamp(Msg)},
|
timestamp => emqx_message:timestamp(Msg),
|
||||||
|
extra => []
|
||||||
|
},
|
||||||
?assertEqual(Map, emqx_message:to_map(Msg)),
|
?assertEqual(Map, emqx_message:to_map(Msg)),
|
||||||
?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))).
|
?assertEqual(Msg, emqx_message:from_map(emqx_message:to_map(Msg))).
|
||||||
|
|
|
@ -0,0 +1,483 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% The proper types extension for EMQ X
|
||||||
|
|
||||||
|
-module(emqx_proper_types).
|
||||||
|
|
||||||
|
-include_lib("proper/include/proper.hrl").
|
||||||
|
-include("emqx.hrl").
|
||||||
|
|
||||||
|
%% High level Types
|
||||||
|
-export([ conninfo/0
|
||||||
|
, clientinfo/0
|
||||||
|
, sessioninfo/0
|
||||||
|
, connack_return_code/0
|
||||||
|
, message/0
|
||||||
|
, topictab/0
|
||||||
|
, topic/0
|
||||||
|
, systopic/0
|
||||||
|
, subopts/0
|
||||||
|
, nodename/0
|
||||||
|
, normal_topic/0
|
||||||
|
, normal_topic_filter/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Basic Types
|
||||||
|
-export([ url/0
|
||||||
|
, ip/0
|
||||||
|
, port/0
|
||||||
|
, limited_atom/0
|
||||||
|
, limited_latin_atom/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Iterators
|
||||||
|
-export([ nof/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Types High level
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% Type defined emqx_types.erl - conninfo()
|
||||||
|
conninfo() ->
|
||||||
|
Keys = [{socktype, socktype()},
|
||||||
|
{sockname, peername()},
|
||||||
|
{peername, peername()},
|
||||||
|
{peercert, peercert()},
|
||||||
|
{conn_mod, conn_mod()},
|
||||||
|
{proto_name, proto_name()},
|
||||||
|
{proto_ver, non_neg_integer()},
|
||||||
|
{clean_start, boolean()},
|
||||||
|
{clientid, clientid()},
|
||||||
|
{username, username()},
|
||||||
|
{conn_props, properties()},
|
||||||
|
{connected, boolean()},
|
||||||
|
{connected_at, timestamp()},
|
||||||
|
{keepalive, range(0, 16#ffff)},
|
||||||
|
{receive_maximum, non_neg_integer()},
|
||||||
|
{expiry_interval, non_neg_integer()}],
|
||||||
|
?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())},
|
||||||
|
begin
|
||||||
|
maps:merge(maps:from_list(Ks), M)
|
||||||
|
end).
|
||||||
|
|
||||||
|
clientinfo() ->
|
||||||
|
Keys = [{zone, zone()},
|
||||||
|
{protocol, protocol()},
|
||||||
|
{peerhost, ip()},
|
||||||
|
{sockport, port()},
|
||||||
|
{clientid, clientid()},
|
||||||
|
{username, username()},
|
||||||
|
{is_bridge, boolean()},
|
||||||
|
{is_supuser, boolean()},
|
||||||
|
{mountpoint, maybe(utf8())},
|
||||||
|
{ws_cookie, maybe(list())}
|
||||||
|
% password,
|
||||||
|
% auth_result,
|
||||||
|
% anonymous,
|
||||||
|
% cn,
|
||||||
|
% dn,
|
||||||
|
],
|
||||||
|
?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())},
|
||||||
|
begin
|
||||||
|
maps:merge(maps:from_list(Ks), M)
|
||||||
|
end).
|
||||||
|
|
||||||
|
%% See emqx_session:session() type define
|
||||||
|
sessioninfo() ->
|
||||||
|
?LET(Session, {session,
|
||||||
|
subscriptions(), % subscriptions
|
||||||
|
non_neg_integer(), % max_subscriptions
|
||||||
|
boolean(), % upgrade_qos
|
||||||
|
inflight(), % emqx_inflight:inflight()
|
||||||
|
mqueue(), % emqx_mqueue:mqueue()
|
||||||
|
packet_id(), % next_pkt_id
|
||||||
|
safty_timeout(), % retry_interval
|
||||||
|
awaiting_rel(), % awaiting_rel
|
||||||
|
non_neg_integer(), % max_awaiting_rel
|
||||||
|
safty_timeout(), % await_rel_timeout
|
||||||
|
timestamp() % created_at
|
||||||
|
},
|
||||||
|
emqx_session:info(Session)).
|
||||||
|
|
||||||
|
subscriptions() ->
|
||||||
|
?LET(L, list({topic(), subopts()}), maps:from_list(L)).
|
||||||
|
|
||||||
|
inflight() ->
|
||||||
|
?LET(MaxLen, non_neg_integer(),
|
||||||
|
begin
|
||||||
|
?LET(Msgs, limited_list(MaxLen, {packet_id(), message(), timestamp()}),
|
||||||
|
begin
|
||||||
|
lists:foldl(fun({PktId, Msg, Ts}, Ift) ->
|
||||||
|
try
|
||||||
|
emqx_inflight:insert(PktId, {Msg, Ts}, Ift)
|
||||||
|
catch _:_ ->
|
||||||
|
Ift
|
||||||
|
end
|
||||||
|
end, emqx_inflight:new(MaxLen), Msgs)
|
||||||
|
end)
|
||||||
|
end).
|
||||||
|
|
||||||
|
mqueue() ->
|
||||||
|
?LET({MaxLen, IsStoreQos0}, {non_neg_integer(), boolean()},
|
||||||
|
begin
|
||||||
|
?LET(Msgs, limited_list(MaxLen, message()),
|
||||||
|
begin
|
||||||
|
Q = emqx_mqueue:init(#{max_len => MaxLen, store_qos0 => IsStoreQos0}),
|
||||||
|
lists:foldl(fun(Msg, Acc) ->
|
||||||
|
{_Dropped, NQ} = emqx_mqueue:in(Msg, Acc),
|
||||||
|
NQ
|
||||||
|
end, Q, Msgs)
|
||||||
|
end)
|
||||||
|
end).
|
||||||
|
|
||||||
|
message() ->
|
||||||
|
#message{
|
||||||
|
id = emqx_guid:gen(),
|
||||||
|
qos = qos(),
|
||||||
|
from = from(),
|
||||||
|
flags = flags(),
|
||||||
|
headers = map(limited_latin_atom(), limited_any_term()), %% headers
|
||||||
|
topic = topic(),
|
||||||
|
payload = payload(),
|
||||||
|
timestamp = timestamp(),
|
||||||
|
extra = []
|
||||||
|
}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
flags() ->
|
||||||
|
?LET({Dup, Retain}, {boolean(), boolean()}, #{dup => Dup, retain => Retain}).
|
||||||
|
|
||||||
|
packet_id() ->
|
||||||
|
range(1, 16#ffff).
|
||||||
|
|
||||||
|
awaiting_rel() ->
|
||||||
|
?LET(L, list({packet_id(), timestamp()}), maps:from_list(L)).
|
||||||
|
|
||||||
|
connack_return_code() ->
|
||||||
|
oneof([ success
|
||||||
|
, protocol_error
|
||||||
|
, client_identifier_not_valid
|
||||||
|
, bad_username_or_password
|
||||||
|
, bad_clientid_or_password
|
||||||
|
, username_or_password_undefined
|
||||||
|
, password_error
|
||||||
|
, not_authorized
|
||||||
|
, server_unavailable
|
||||||
|
, server_busy
|
||||||
|
, banned
|
||||||
|
, bad_authentication_method
|
||||||
|
]).
|
||||||
|
|
||||||
|
topictab() ->
|
||||||
|
non_empty(list({topic(), subopts()})).
|
||||||
|
|
||||||
|
topic() ->
|
||||||
|
oneof([normal_topic(),
|
||||||
|
normal_topic_filter(),
|
||||||
|
systopic_broker(), systopic_present(), systopic_stats(),
|
||||||
|
systopic_metrics(), systopic_alarms(), systopic_mon(),
|
||||||
|
sharetopic()]).
|
||||||
|
|
||||||
|
subopts() ->
|
||||||
|
?LET({Nl, Qos, Rap, Rh},
|
||||||
|
{range(0, 1), qos(),
|
||||||
|
range(0, 1), range(0, 1)},
|
||||||
|
#{nl => Nl, qos => Qos, rap => Rap, rh => Rh}).
|
||||||
|
|
||||||
|
qos() ->
|
||||||
|
range(0, 2).
|
||||||
|
|
||||||
|
from() ->
|
||||||
|
oneof([limited_latin_atom()]).
|
||||||
|
|
||||||
|
payload() ->
|
||||||
|
binary().
|
||||||
|
|
||||||
|
safty_timeout() ->
|
||||||
|
non_neg_integer().
|
||||||
|
|
||||||
|
nodename() ->
|
||||||
|
?LET({Name, Ip}, {non_empty(list(latin_char())), ip()},
|
||||||
|
begin
|
||||||
|
binary_to_atom(iolist_to_binary([Name, "@", inet:ntoa(Ip)]), utf8)
|
||||||
|
end).
|
||||||
|
|
||||||
|
systopic() ->
|
||||||
|
oneof(
|
||||||
|
[systopic_broker(), systopic_present(), systopic_stats(),
|
||||||
|
systopic_metrics(), systopic_alarms(), systopic_mon()]).
|
||||||
|
|
||||||
|
systopic_broker() ->
|
||||||
|
Topics = [<<"">>, <<"version">>, <<"uptime">>, <<"datetime">>, <<"sysdescr">>],
|
||||||
|
?LET({Nodename, T},
|
||||||
|
{nodename(), oneof(Topics)},
|
||||||
|
begin
|
||||||
|
case byte_size(T) of
|
||||||
|
0 -> <<"$SYS/brokers">>;
|
||||||
|
_ ->
|
||||||
|
<<"$SYS/brokers/", (ensure_bin(Nodename))/binary, "/", T/binary>>
|
||||||
|
end
|
||||||
|
end).
|
||||||
|
|
||||||
|
systopic_present() ->
|
||||||
|
?LET({Nodename, ClientId, T},
|
||||||
|
{nodename(), clientid(), oneof([<<"connected">>, <<"disconnected">>])},
|
||||||
|
begin
|
||||||
|
<<"$SYS/brokers/", (ensure_bin(Nodename))/binary, "/clients/", (ensure_bin(ClientId))/binary, "/", T/binary>>
|
||||||
|
end).
|
||||||
|
|
||||||
|
systopic_stats() ->
|
||||||
|
Topics = [<<"connections/max">>, <<"connections/count">>,
|
||||||
|
<<"suboptions/max">>, <<"suboptions/count">>,
|
||||||
|
<<"subscribers/max">>, <<"subscribers/count">>,
|
||||||
|
<<"subscriptions/max">>, <<"subscriptions/count">>,
|
||||||
|
<<"subscriptions/shared/max">>, <<"subscriptions/shared/count">>,
|
||||||
|
<<"topics/max">>, <<"topics/count">>,
|
||||||
|
<<"routes/max">>, <<"routes/count">>
|
||||||
|
],
|
||||||
|
?LET({Nodename, T},
|
||||||
|
{nodename(), oneof(Topics)},
|
||||||
|
<<"$SYS/brokers/", (ensure_bin(Nodename))/binary, "/stats/", T/binary>>).
|
||||||
|
|
||||||
|
systopic_metrics() ->
|
||||||
|
Topics = [<<"bytes/received">>, <<"bytes/sent">>,
|
||||||
|
<<"packets/received">>, <<"packets/sent">>,
|
||||||
|
<<"packets/connect/received">>, <<"packets/connack/sent">>,
|
||||||
|
<<"packets/publish/received">>, <<"packets/publish/sent">>,
|
||||||
|
<<"packets/publish/error">>, <<"packets/publish/auth_error">>,
|
||||||
|
<<"packets/publish/dropped">>,
|
||||||
|
<<"packets/puback/received">>, <<"packets/puback/sent">>,
|
||||||
|
<<"packets/puback/inuse">>, <<"packets/puback/missed">>,
|
||||||
|
<<"packets/pubrec/received">>, <<"packets/pubrec/sent">>,
|
||||||
|
<<"packets/pubrec/inuse">>, <<"packets/pubrec/missed">>,
|
||||||
|
<<"packets/pubrel/received">>, <<"packets/pubrel/sent">>,
|
||||||
|
<<"packets/pubrel/missed">>,
|
||||||
|
<<"packets/pubcomp/received">>, <<"packets/pubcomp/sent">>,
|
||||||
|
<<"packets/pubcomp/inuse">>, <<"packets/pubcomp/missed">>,
|
||||||
|
<<"packets/subscribe/received">>, <<"packets/subscribe/error">>,
|
||||||
|
<<"packets/subscribe/auth_error">>, <<"packets/suback/sent">>,
|
||||||
|
<<"packets/unsubscribe/received">>, <<"packets/unsuback/sent">>,
|
||||||
|
<<"packets/pingreq/received">>, <<"packets/pingresp/sent">>,
|
||||||
|
<<"packets/disconnect/received">>, <<"packets/disconnect/sent">>,
|
||||||
|
<<"packets/auth/received">>, <<"packets/auth/sent">>,
|
||||||
|
<<"messages/received">>, <<"messages/sent">>,
|
||||||
|
<<"messages/qos0/received">>, <<"messages/qos0/sent">>,
|
||||||
|
<<"messages/qos1/received">>, <<"messages/qos1/sent">>,
|
||||||
|
<<"messages/qos2/received">>, <<"messages/qos2/sent">>,
|
||||||
|
<<"messages/publish">>, <<"messages/dropped">>,
|
||||||
|
<<"messages/dropped/expired">>, <<"messages/dropped/no_subscribers">>,
|
||||||
|
<<"messages/forward">>, <<"messages/retained">>,
|
||||||
|
<<"messages/delayed">>, <<"messages/delivered">>,
|
||||||
|
<<"messages/acked">>],
|
||||||
|
?LET({Nodename, T},
|
||||||
|
{nodename(), oneof(Topics)},
|
||||||
|
<<"$SYS/brokers/", (ensure_bin(Nodename))/binary, "/metrics/", T/binary>>).
|
||||||
|
|
||||||
|
systopic_alarms() ->
|
||||||
|
?LET({Nodename, T},
|
||||||
|
{nodename(), oneof([<<"alert">>, <<"clear">>])},
|
||||||
|
<<"$SYS/brokers/", (ensure_bin(Nodename))/binary, "/alarms/", T/binary>>).
|
||||||
|
|
||||||
|
systopic_mon() ->
|
||||||
|
Topics = [<<"long_gc">>, <<"long_schedule">>,
|
||||||
|
<<"large_heap">>, <<"busy_port">>, <<"busy_dist_port">>],
|
||||||
|
?LET({Nodename, T},
|
||||||
|
{nodename(), oneof(Topics)},
|
||||||
|
<<"$SYS/brokers/", (ensure_bin(Nodename))/binary, "/sysmon/", T/binary>>).
|
||||||
|
|
||||||
|
sharetopic() ->
|
||||||
|
?LET({Type, Grp, T},
|
||||||
|
{oneof([<<"$queue">>, <<"$share">>]), list(latin_char()), normal_topic()},
|
||||||
|
<<Type/binary, "/", (iolist_to_binary(Grp))/binary, "/", T/binary>>).
|
||||||
|
|
||||||
|
normal_topic() ->
|
||||||
|
?LET(L, list(frequency([{3, latin_char()}, {1, $/}])),
|
||||||
|
list_to_binary(L)).
|
||||||
|
|
||||||
|
normal_topic_filter() ->
|
||||||
|
?LET({L, Wild}, {list(list(latin_char())), oneof(['#', '+'])},
|
||||||
|
begin
|
||||||
|
case Wild of
|
||||||
|
'#' ->
|
||||||
|
case L of
|
||||||
|
[] -> <<"#">>;
|
||||||
|
_ -> iolist_to_binary([lists:join("/", L), "/#"])
|
||||||
|
end;
|
||||||
|
'+' ->
|
||||||
|
case L of
|
||||||
|
[] -> <<"+">>;
|
||||||
|
_ ->
|
||||||
|
L1 = [case rand:uniform(3) == 1 of
|
||||||
|
true -> "+";
|
||||||
|
_ -> E
|
||||||
|
end || E <- L],
|
||||||
|
iolist_to_binary(lists:join("/", L1))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Basic Types
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
maybe(T) ->
|
||||||
|
oneof([undefined, T]).
|
||||||
|
|
||||||
|
socktype() ->
|
||||||
|
oneof([tcp, udp, ssl, proxy]).
|
||||||
|
|
||||||
|
peername() ->
|
||||||
|
{ip(), port()}.
|
||||||
|
|
||||||
|
peercert() ->
|
||||||
|
%% TODO: cert?
|
||||||
|
oneof([nossl, undefined]).
|
||||||
|
|
||||||
|
conn_mod() ->
|
||||||
|
oneof([emqx_connection, emqx_ws_connection, emqx_coap_mqtt_adapter,
|
||||||
|
emqx_sn_gateway, emqx_lwm2m_protocol, emqx_gbt32960_conn,
|
||||||
|
emqx_jt808_connection, emqx_tcp_connection]).
|
||||||
|
|
||||||
|
proto_name() ->
|
||||||
|
oneof([<<"MQTT">>, <<"MQTT-SN">>, <<"CoAP">>, <<"LwM2M">>, utf8()]).
|
||||||
|
|
||||||
|
clientid() ->
|
||||||
|
utf8().
|
||||||
|
|
||||||
|
username() ->
|
||||||
|
maybe(utf8()).
|
||||||
|
|
||||||
|
properties() ->
|
||||||
|
map(limited_latin_atom(), binary()).
|
||||||
|
|
||||||
|
%% millisecond
|
||||||
|
timestamp() ->
|
||||||
|
%% 12h <- Now -> 12h
|
||||||
|
?LET(Offset, range(-43200, 43200), erlang:system_time(millisecond) + Offset).
|
||||||
|
|
||||||
|
zone() ->
|
||||||
|
oneof([external, internal, limited_latin_atom()]).
|
||||||
|
|
||||||
|
protocol() ->
|
||||||
|
oneof([mqtt, 'mqtt-sn', coap, lwm2m, limited_latin_atom()]).
|
||||||
|
|
||||||
|
url() ->
|
||||||
|
?LET({Schema, IP, Port, Path}, {oneof(["http://", "https://"]), ip(), port(), http_path()},
|
||||||
|
begin
|
||||||
|
IP1 = case tuple_size(IP) == 8 of
|
||||||
|
true -> "[" ++ inet:ntoa(IP) ++ "]";
|
||||||
|
false -> inet:ntoa(IP)
|
||||||
|
end,
|
||||||
|
lists:concat([Schema, IP1, ":", integer_to_list(Port), "/", Path])
|
||||||
|
end).
|
||||||
|
|
||||||
|
ip() ->
|
||||||
|
oneof([ipv4(), ipv6(), ipv6_from_v4()]).
|
||||||
|
|
||||||
|
ipv4() ->
|
||||||
|
?LET(IP, {range(1, 16#ff), range(0, 16#ff),
|
||||||
|
range(0, 16#ff), range(0, 16#ff)}, IP).
|
||||||
|
|
||||||
|
ipv6() ->
|
||||||
|
?LET(IP, {range(0, 16#ff), range(0, 16#ff),
|
||||||
|
range(0, 16#ff), range(0, 16#ff),
|
||||||
|
range(0, 16#ff), range(0, 16#ff),
|
||||||
|
range(0, 16#ff), range(0, 16#ff)}, IP).
|
||||||
|
|
||||||
|
ipv6_from_v4() ->
|
||||||
|
?LET(IP, {range(1, 16#ff), range(0, 16#ff),
|
||||||
|
range(0, 16#ff), range(0, 16#ff)},
|
||||||
|
inet:ipv4_mapped_ipv6_address(IP)).
|
||||||
|
|
||||||
|
port() ->
|
||||||
|
?LET(Port, range(1, 16#ffff), Port).
|
||||||
|
|
||||||
|
http_path() ->
|
||||||
|
list(frequency([{3, latin_char()},
|
||||||
|
{1, $/}])).
|
||||||
|
|
||||||
|
latin_char() ->
|
||||||
|
oneof([integer($0, $9), integer($A, $Z), integer($a, $z)]).
|
||||||
|
|
||||||
|
limited_latin_atom() ->
|
||||||
|
oneof([ 'abc_atom'
|
||||||
|
, '0123456789'
|
||||||
|
, 'ABC-ATOM'
|
||||||
|
, 'abc123ABC'
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Avoid generating a lot of atom and causing atom table overflows
|
||||||
|
limited_atom() ->
|
||||||
|
oneof([ 'a_normal_atom'
|
||||||
|
, '10123_num_prefixed_atom'
|
||||||
|
, '___dash_prefixed_atom'
|
||||||
|
, '123'
|
||||||
|
, binary_to_atom(<<"你好_utf8_atom"/utf8>>)
|
||||||
|
, '_', ' ', '""', '#$%^&*'
|
||||||
|
%% The longest atom with 255 chars
|
||||||
|
, list_to_atom(
|
||||||
|
lists:append([ "so"
|
||||||
|
, [ $o || _ <- lists:seq(1, 243)]
|
||||||
|
, "-long-atom"]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]).
|
||||||
|
|
||||||
|
limited_any_term() ->
|
||||||
|
oneof([binary(), number(), string()]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Iterators
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
nof(Ls) when is_list(Ls) ->
|
||||||
|
Len = length(Ls),
|
||||||
|
?LET(N, range(0, Len),
|
||||||
|
begin
|
||||||
|
Ns = rand_nl(N, Len, []),
|
||||||
|
[lists:nth(I, Ls) || I <- Ns]
|
||||||
|
end).
|
||||||
|
|
||||||
|
limited_list(0, T) ->
|
||||||
|
list(T);
|
||||||
|
|
||||||
|
limited_list(N, T) ->
|
||||||
|
?LET(N2, range(0, N),
|
||||||
|
begin
|
||||||
|
[T || _ <- lists:seq(1, N2)]
|
||||||
|
end).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal funcs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-compile({inline, rand_nl/3}).
|
||||||
|
|
||||||
|
rand_nl(0, _, Acc) ->
|
||||||
|
Acc;
|
||||||
|
rand_nl(N, L, Acc) ->
|
||||||
|
R = rand:uniform(L),
|
||||||
|
case lists:member(R, Acc) of
|
||||||
|
true -> rand_nl(N, L, Acc);
|
||||||
|
_ -> rand_nl(N-1, L, [R|Acc])
|
||||||
|
end.
|
||||||
|
|
||||||
|
ensure_bin(A) when is_atom(A) ->
|
||||||
|
atom_to_binary(A, utf8);
|
||||||
|
ensure_bin(B) when is_binary(B) ->
|
||||||
|
B.
|
|
@ -24,21 +24,22 @@
|
||||||
|
|
||||||
-define(SYSMON, emqx_sys_mon).
|
-define(SYSMON, emqx_sys_mon).
|
||||||
|
|
||||||
|
-define(FAKE_PORT, hd(erlang:ports())).
|
||||||
|
-define(FAKE_INFO, [{timeout, 100}, {in, foo}, {out, {?MODULE, bar, 1}}]).
|
||||||
-define(INPUTINFO, [{self(), long_gc,
|
-define(INPUTINFO, [{self(), long_gc,
|
||||||
concat_str("long_gc warning: pid = ~p, info: ~p", self(), "hello"), "hello"},
|
fmt("long_gc warning: pid = ~p", [self()]), ?FAKE_INFO},
|
||||||
{self(), long_schedule,
|
{self(), long_schedule,
|
||||||
concat_str("long_schedule warning: pid = ~p, info: ~p", self(), "hello"), "hello"},
|
fmt("long_schedule warning: pid = ~p", [self()]), ?FAKE_INFO},
|
||||||
{self(), large_heap,
|
{self(), large_heap,
|
||||||
concat_str("large_heap warning: pid = ~p, info: ~p", self(), "hello"), "hello"},
|
fmt("large_heap warning: pid = ~p", [self()]), ?FAKE_INFO},
|
||||||
{self(), busy_port,
|
{self(), busy_port,
|
||||||
concat_str("busy_port warning: suspid = ~p, port = ~p",
|
fmt("busy_port warning: suspid = ~p, port = ~p",
|
||||||
self(), list_to_port("#Port<0.4>")), list_to_port("#Port<0.4>")},
|
[self(), ?FAKE_PORT]), ?FAKE_PORT},
|
||||||
{self(), busy_dist_port,
|
{self(), busy_dist_port,
|
||||||
concat_str("busy_dist_port warning: suspid = ~p, port = ~p",
|
fmt("busy_dist_port warning: suspid = ~p, port = ~p",
|
||||||
self(), list_to_port("#Port<0.4>")),list_to_port("#Port<0.4>")},
|
[self(), ?FAKE_PORT]), ?FAKE_PORT},
|
||||||
{list_to_port("#Port<0.4>"), long_schedule,
|
{?FAKE_PORT, long_schedule,
|
||||||
concat_str("long_schedule warning: port = ~p, info: ~p",
|
fmt("long_schedule warning: port = ~p", [?FAKE_PORT]), ?FAKE_INFO}
|
||||||
list_to_port("#Port<0.4>"), "hello"), "hello"}
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
@ -82,16 +83,16 @@ t_procinfo(_) ->
|
||||||
ok = meck:new(emqx_vm, [passthrough, no_history]),
|
ok = meck:new(emqx_vm, [passthrough, no_history]),
|
||||||
ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end),
|
ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end),
|
||||||
ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> [] end),
|
ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> [] end),
|
||||||
?assertEqual([], emqx_sys_mon:procinfo([])),
|
?assertEqual([{pid, undefined}], emqx_sys_mon:procinfo(undefined)),
|
||||||
ok = meck:expect(emqx_vm, get_process_info, fun(_) -> ok end),
|
ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end),
|
||||||
ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> undefined end),
|
ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> undefined end),
|
||||||
?assertEqual(undefined, emqx_sys_mon:procinfo([])),
|
?assertEqual([{pid, self()}], emqx_sys_mon:procinfo(self())),
|
||||||
ok = meck:unload(emqx_vm).
|
ok = meck:unload(emqx_vm).
|
||||||
|
|
||||||
t_sys_mon(_Config) ->
|
t_sys_mon(_Config) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({PidOrPort, SysMonName,ValidateInfo, InfoOrPort}) ->
|
fun({PidOrPort, SysMonName, ValidateInfo, InfoOrPort}) ->
|
||||||
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort)
|
validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort)
|
||||||
end, ?INPUTINFO).
|
end, ?INPUTINFO).
|
||||||
|
|
||||||
t_sys_mon2(_Config) ->
|
t_sys_mon2(_Config) ->
|
||||||
|
@ -101,7 +102,7 @@ t_sys_mon2(_Config) ->
|
||||||
?assertEqual(ok, gen_server:cast(?SYSMON, ignored)),
|
?assertEqual(ok, gen_server:cast(?SYSMON, ignored)),
|
||||||
gen_server:stop(?SYSMON).
|
gen_server:stop(?SYSMON).
|
||||||
|
|
||||||
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) ->
|
validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort) ->
|
||||||
{ok, C} = emqtt:start_link([{host, "localhost"}]),
|
{ok, C} = emqtt:start_link([{host, "localhost"}]),
|
||||||
{ok, _} = emqtt:connect(C),
|
{ok, _} = emqtt:connect(C),
|
||||||
emqtt:subscribe(C, emqx_topic:systop(lists:concat(['sysmon/', SysMonName])), qos1),
|
emqtt:subscribe(C, emqx_topic:systop(lists:concat(['sysmon/', SysMonName])), qos1),
|
||||||
|
@ -117,6 +118,4 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) ->
|
||||||
end,
|
end,
|
||||||
emqtt:stop(C).
|
emqtt:stop(C).
|
||||||
|
|
||||||
concat_str(ValidateInfo, InfoOrPort, Info) ->
|
fmt(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
|
||||||
WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]),
|
|
||||||
lists:flatten(WarnInfo).
|
|
||||||
|
|
|
@ -229,7 +229,7 @@ t_ws_check_origin(_) ->
|
||||||
?assertMatch({gun_upgrade, _},
|
?assertMatch({gun_upgrade, _},
|
||||||
start_ws_client(#{protocols => [<<"mqtt">>],
|
start_ws_client(#{protocols => [<<"mqtt">>],
|
||||||
headers => [{<<"origin">>, <<"http://localhost:18083">>}]})),
|
headers => [{<<"origin">>, <<"http://localhost:18083">>}]})),
|
||||||
?assertMatch({gun_response, {_, 500, _}},
|
?assertMatch({gun_response, {_, 403, _}},
|
||||||
start_ws_client(#{protocols => [<<"mqtt">>],
|
start_ws_client(#{protocols => [<<"mqtt">>],
|
||||||
headers => [{<<"origin">>, <<"http://localhost:18080">>}]})).
|
headers => [{<<"origin">>, <<"http://localhost:18080">>}]})).
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,7 @@ json_basic() ->
|
||||||
oneof([true, false, null, number(), json_string()]).
|
oneof([true, false, null, number(), json_string()]).
|
||||||
|
|
||||||
latin_atom() ->
|
latin_atom() ->
|
||||||
emqx_ct_proper_types:limited_latin_atom().
|
emqx_proper_types:limited_latin_atom().
|
||||||
|
|
||||||
json_string() -> utf8().
|
json_string() -> utf8().
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
-include_lib("proper/include/proper.hrl").
|
-include_lib("proper/include/proper.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-import(emqx_ct_proper_types,
|
-import(emqx_proper_types,
|
||||||
[ conninfo/0
|
[ conninfo/0
|
||||||
, clientinfo/0
|
, clientinfo/0
|
||||||
, sessioninfo/0
|
, sessioninfo/0
|
||||||
|
@ -503,7 +503,7 @@ unsub_properties() ->
|
||||||
#{}.
|
#{}.
|
||||||
|
|
||||||
shutdown_reason() ->
|
shutdown_reason() ->
|
||||||
oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]).
|
oneof([utf8(), {shutdown, emqx_proper_types:limited_atom()}]).
|
||||||
|
|
||||||
authresult() ->
|
authresult() ->
|
||||||
?LET(RC, connack_return_code(),
|
?LET(RC, connack_return_code(),
|
||||||
|
|
|
@ -24,8 +24,7 @@
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-import(emqx_ct_proper_types,
|
-import(emqx_proper_types, [topic/0]).
|
||||||
[topic/0]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Messages
|
%% Messages
|
||||||
|
|
Loading…
Reference in New Issue