style(elvis): fix elvis style complaints
This commit is contained in:
parent
8fe342a02d
commit
60d5017eea
|
@ -103,7 +103,7 @@
|
||||||
|
|
||||||
-type(reply() :: {outgoing, emqx_types:packet()}
|
-type(reply() :: {outgoing, emqx_types:packet()}
|
||||||
| {outgoing, [emqx_types:packet()]}
|
| {outgoing, [emqx_types:packet()]}
|
||||||
| {event, conn_state()|updated}
|
| {event, conn_state() | updated}
|
||||||
| {close, Reason :: atom()}).
|
| {close, Reason :: atom()}).
|
||||||
|
|
||||||
-type(replies() :: emqx_types:packet() | reply() | [reply()]).
|
-type(replies() :: emqx_types:packet() | reply() | [reply()]).
|
||||||
|
@ -132,7 +132,7 @@
|
||||||
info(Channel) ->
|
info(Channel) ->
|
||||||
maps:from_list(info(?INFO_KEYS, Channel)).
|
maps:from_list(info(?INFO_KEYS, Channel)).
|
||||||
|
|
||||||
-spec(info(list(atom())|atom(), channel()) -> term()).
|
-spec(info(list(atom()) | atom(), channel()) -> term()).
|
||||||
info(Keys, Channel) when is_list(Keys) ->
|
info(Keys, Channel) when is_list(Keys) ->
|
||||||
[{Key, info(Key, Channel)} || Key <- Keys];
|
[{Key, info(Key, Channel)} || Key <- Keys];
|
||||||
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
||||||
|
@ -328,17 +328,23 @@ handle_in(Packet = ?AUTH_PACKET(ReasonCode, _Properties),
|
||||||
connecting ->
|
connecting ->
|
||||||
process_connect(NProperties, ensure_connected(NChannel));
|
process_connect(NProperties, ensure_connected(NChannel));
|
||||||
_ ->
|
_ ->
|
||||||
handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel#channel{conn_state = connected})
|
handle_out( auth
|
||||||
|
, {?RC_SUCCESS, NProperties}
|
||||||
|
, NChannel#channel{conn_state = connected}
|
||||||
|
)
|
||||||
end;
|
end;
|
||||||
{continue, NProperties, NChannel} ->
|
{continue, NProperties, NChannel} ->
|
||||||
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel#channel{conn_state = reauthenticating});
|
handle_out( auth
|
||||||
|
, {?RC_CONTINUE_AUTHENTICATION, NProperties}
|
||||||
|
, NChannel#channel{conn_state = reauthenticating}
|
||||||
|
);
|
||||||
{error, NReasonCode} ->
|
{error, NReasonCode} ->
|
||||||
case ConnState of
|
case ConnState of
|
||||||
connecting ->
|
connecting ->
|
||||||
handle_out(connack, NReasonCode, Channel);
|
handle_out(connack, NReasonCode, Channel);
|
||||||
_ ->
|
_ ->
|
||||||
handle_out(disconnect, NReasonCode, Channel)
|
handle_out(disconnect, NReasonCode, Channel)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
catch
|
catch
|
||||||
_Class:_Reason ->
|
_Class:_Reason ->
|
||||||
|
@ -632,7 +638,7 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "dropped_qos2_packet",
|
msg => "dropped_qos2_packet",
|
||||||
reason => emqx_reason_codes:name(RC),
|
reason => emqx_reason_codes:name(RC),
|
||||||
packetId => PacketId
|
packet_id => PacketId
|
||||||
}),
|
}),
|
||||||
ok = emqx_metrics:inc('packets.publish.dropped'),
|
ok = emqx_metrics:inc('packets.publish.dropped'),
|
||||||
handle_out(pubrec, {PacketId, RC}, Channel)
|
handle_out(pubrec, {PacketId, RC}, Channel)
|
||||||
|
@ -655,7 +661,7 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
|
||||||
|
|
||||||
-compile({inline, [puback_reason_code/1]}).
|
-compile({inline, [puback_reason_code/1]}).
|
||||||
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
||||||
puback_reason_code([_|_]) -> ?RC_SUCCESS.
|
puback_reason_code([_ | _]) -> ?RC_SUCCESS.
|
||||||
|
|
||||||
-compile({inline, [after_message_acked/3]}).
|
-compile({inline, [after_message_acked/3]}).
|
||||||
after_message_acked(ClientInfo, Msg, PubAckProps) ->
|
after_message_acked(ClientInfo, Msg, PubAckProps) ->
|
||||||
|
@ -674,7 +680,7 @@ process_subscribe(TopicFilters, SubProps, Channel) ->
|
||||||
process_subscribe([], _SubProps, Channel, Acc) ->
|
process_subscribe([], _SubProps, Channel, Acc) ->
|
||||||
{lists:reverse(Acc), Channel};
|
{lists:reverse(Acc), Channel};
|
||||||
|
|
||||||
process_subscribe([Topic = {TopicFilter, SubOpts}|More], SubProps, Channel, Acc) ->
|
process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Acc) ->
|
||||||
case check_sub_caps(TopicFilter, SubOpts, Channel) of
|
case check_sub_caps(TopicFilter, SubOpts, Channel) of
|
||||||
ok ->
|
ok ->
|
||||||
{ReasonCode, NChannel} = do_subscribe(TopicFilter,
|
{ReasonCode, NChannel} = do_subscribe(TopicFilter,
|
||||||
|
@ -716,9 +722,9 @@ process_unsubscribe(TopicFilters, UnSubProps, Channel) ->
|
||||||
process_unsubscribe([], _UnSubProps, Channel, Acc) ->
|
process_unsubscribe([], _UnSubProps, Channel, Acc) ->
|
||||||
{lists:reverse(Acc), Channel};
|
{lists:reverse(Acc), Channel};
|
||||||
|
|
||||||
process_unsubscribe([{TopicFilter, SubOpts}|More], UnSubProps, Channel, Acc) ->
|
process_unsubscribe([{TopicFilter, SubOpts} | More], UnSubProps, Channel, Acc) ->
|
||||||
{RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel),
|
{RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel),
|
||||||
process_unsubscribe(More, UnSubProps, NChannel, [RC|Acc]).
|
process_unsubscribe(More, UnSubProps, NChannel, [RC | Acc]).
|
||||||
|
|
||||||
do_unsubscribe(TopicFilter, SubOpts, Channel =
|
do_unsubscribe(TopicFilter, SubOpts, Channel =
|
||||||
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
||||||
|
@ -790,7 +796,9 @@ handle_deliver(Delivers, Channel = #channel{takeover = true,
|
||||||
pendings = Pendings,
|
pendings = Pendings,
|
||||||
session = Session,
|
session = Session,
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = #{clientid := ClientId}}) ->
|
||||||
NPendings = lists:append(Pendings, emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
|
NPendings = lists:append(
|
||||||
|
Pendings,
|
||||||
|
emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
|
||||||
{ok, Channel#channel{pendings = NPendings}};
|
{ok, Channel#channel{pendings = NPendings}};
|
||||||
|
|
||||||
handle_deliver(Delivers, Channel = #channel{session = Session,
|
handle_deliver(Delivers, Channel = #channel{session = Session,
|
||||||
|
@ -1365,17 +1373,20 @@ authenticate(?AUTH_PACKET(_, #{'Authentication-Method' := AuthMethod} = Properti
|
||||||
{error, ?RC_BAD_AUTHENTICATION_METHOD}
|
{error, ?RC_BAD_AUTHENTICATION_METHOD}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_authenticate(#{auth_method := AuthMethod} = Credential, #channel{clientinfo = ClientInfo} = Channel) ->
|
do_authenticate(#{auth_method := AuthMethod} = Credential,
|
||||||
|
#channel{clientinfo = ClientInfo} = Channel) ->
|
||||||
Properties = #{'Authentication-Method' => AuthMethod},
|
Properties = #{'Authentication-Method' => AuthMethod},
|
||||||
case emqx_access_control:authenticate(Credential) of
|
case emqx_access_control:authenticate(Credential) of
|
||||||
{ok, Result} ->
|
{ok, Result} ->
|
||||||
{ok, Properties,
|
{ok, Properties,
|
||||||
Channel#channel{clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)},
|
Channel#channel{
|
||||||
auth_cache = #{}}};
|
clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)},
|
||||||
|
auth_cache = #{}}};
|
||||||
{ok, Result, AuthData} ->
|
{ok, Result, AuthData} ->
|
||||||
{ok, Properties#{'Authentication-Data' => AuthData},
|
{ok, Properties#{'Authentication-Data' => AuthData},
|
||||||
Channel#channel{clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)},
|
Channel#channel{
|
||||||
auth_cache = #{}}};
|
clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)},
|
||||||
|
auth_cache = #{}}};
|
||||||
{continue, AuthCache} ->
|
{continue, AuthCache} ->
|
||||||
{continue, Properties, Channel#channel{auth_cache = AuthCache}};
|
{continue, Properties, Channel#channel{auth_cache = AuthCache}};
|
||||||
{continue, AuthData, AuthCache} ->
|
{continue, AuthData, AuthCache} ->
|
||||||
|
|
|
@ -102,6 +102,11 @@
|
||||||
|
|
||||||
-define(T_TAKEOVER, 15000).
|
-define(T_TAKEOVER, 15000).
|
||||||
|
|
||||||
|
%% linting overrides
|
||||||
|
-elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}}
|
||||||
|
, {elvis_style, god_modules, #{ignore => [emqx_cm]}}
|
||||||
|
]).
|
||||||
|
|
||||||
%% @doc Start the channel manager.
|
%% @doc Start the channel manager.
|
||||||
-spec(start_link() -> startlink_ret()).
|
-spec(start_link() -> startlink_ret()).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
|
@ -245,7 +250,10 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
pendings => Pendings}};
|
pendings => Pendings}};
|
||||||
{living, ConnMod, ChanPid, Session} ->
|
{living, ConnMod, ChanPid, Session} ->
|
||||||
ok = emqx_session:resume(ClientInfo, Session),
|
ok = emqx_session:resume(ClientInfo, Session),
|
||||||
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
|
Session1 = emqx_persistent_session:persist( ClientInfo
|
||||||
|
, ConnInfo
|
||||||
|
, Session
|
||||||
|
),
|
||||||
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
|
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
|
||||||
register_channel(ClientId, Self, ConnInfo),
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
{ok, #{session => Session1,
|
{ok, #{session => Session1,
|
||||||
|
@ -254,12 +262,18 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
{expired, OldSession} ->
|
{expired, OldSession} ->
|
||||||
_ = emqx_persistent_session:discard(ClientId, OldSession),
|
_ = emqx_persistent_session:discard(ClientId, OldSession),
|
||||||
Session = create_session(ClientInfo, ConnInfo),
|
Session = create_session(ClientInfo, ConnInfo),
|
||||||
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
|
Session1 = emqx_persistent_session:persist( ClientInfo
|
||||||
|
, ConnInfo
|
||||||
|
, Session
|
||||||
|
),
|
||||||
register_channel(ClientId, Self, ConnInfo),
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
{ok, #{session => Session1, present => false}};
|
{ok, #{session => Session1, present => false}};
|
||||||
none ->
|
none ->
|
||||||
Session = create_session(ClientInfo, ConnInfo),
|
Session = create_session(ClientInfo, ConnInfo),
|
||||||
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
|
Session1 = emqx_persistent_session:persist( ClientInfo
|
||||||
|
, ConnInfo
|
||||||
|
, Session
|
||||||
|
),
|
||||||
register_channel(ClientId, Self, ConnInfo),
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
{ok, #{session => Session1, present => false}}
|
{ok, #{session => Session1, present => false}}
|
||||||
end
|
end
|
||||||
|
@ -309,7 +323,7 @@ takeover_session(ClientId) ->
|
||||||
[ChanPid] ->
|
[ChanPid] ->
|
||||||
takeover_session(ClientId, ChanPid);
|
takeover_session(ClientId, ChanPid);
|
||||||
ChanPids ->
|
ChanPids ->
|
||||||
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
[ChanPid | StalePids] = lists:reverse(ChanPids),
|
||||||
?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}),
|
?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}),
|
||||||
lists:foreach(fun(StalePid) ->
|
lists:foreach(fun(StalePid) ->
|
||||||
catch discard_session(ClientId, StalePid)
|
catch discard_session(ClientId, StalePid)
|
||||||
|
@ -374,7 +388,7 @@ kick_session(ClientId) ->
|
||||||
[ChanPid] ->
|
[ChanPid] ->
|
||||||
kick_session(ClientId, ChanPid);
|
kick_session(ClientId, ChanPid);
|
||||||
ChanPids ->
|
ChanPids ->
|
||||||
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
[ChanPid | StalePids] = lists:reverse(ChanPids),
|
||||||
?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}),
|
?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}),
|
||||||
lists:foreach(fun(StalePid) ->
|
lists:foreach(fun(StalePid) ->
|
||||||
catch discard_session(ClientId, StalePid)
|
catch discard_session(ClientId, StalePid)
|
||||||
|
|
|
@ -149,7 +149,7 @@ start_link(Transport, Socket, Options) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Get infos of the connection/channel.
|
%% @doc Get infos of the connection/channel.
|
||||||
-spec(info(pid()|state()) -> emqx_types:infos()).
|
-spec(info(pid() | state()) -> emqx_types:infos()).
|
||||||
info(CPid) when is_pid(CPid) ->
|
info(CPid) when is_pid(CPid) ->
|
||||||
call(CPid, info);
|
call(CPid, info);
|
||||||
info(State = #state{channel = Channel}) ->
|
info(State = #state{channel = Channel}) ->
|
||||||
|
@ -176,7 +176,7 @@ info(limiter, #state{limiter = Limiter}) ->
|
||||||
maybe_apply(fun emqx_limiter:info/1, Limiter).
|
maybe_apply(fun emqx_limiter:info/1, Limiter).
|
||||||
|
|
||||||
%% @doc Get stats of the connection/channel.
|
%% @doc Get stats of the connection/channel.
|
||||||
-spec(stats(pid()|state()) -> emqx_types:stats()).
|
-spec(stats(pid() | state()) -> emqx_types:stats()).
|
||||||
stats(CPid) when is_pid(CPid) ->
|
stats(CPid) when is_pid(CPid) ->
|
||||||
call(CPid, stats);
|
call(CPid, stats);
|
||||||
stats(#state{transport = Transport,
|
stats(#state{transport = Transport,
|
||||||
|
@ -373,7 +373,7 @@ cancel_stats_timer(State) -> State.
|
||||||
|
|
||||||
process_msg([], State) ->
|
process_msg([], State) ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
process_msg([Msg|More], State) ->
|
process_msg([Msg | More], State) ->
|
||||||
try
|
try
|
||||||
case handle_msg(Msg, State) of
|
case handle_msg(Msg, State) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -475,7 +475,7 @@ handle_msg({Passive, _Sock}, State)
|
||||||
handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{
|
handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{
|
||||||
listener = {Type, Listener}} = State) ->
|
listener = {Type, Listener}} = State) ->
|
||||||
ActiveN = get_active_n(Type, Listener),
|
ActiveN = get_active_n(Type, Listener),
|
||||||
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)],
|
||||||
with_channel(handle_deliver, [Delivers], State);
|
with_channel(handle_deliver, [Delivers], State);
|
||||||
|
|
||||||
%% Something sent
|
%% Something sent
|
||||||
|
@ -649,7 +649,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
||||||
{Packets, State#state{parse_state = NParseState}};
|
{Packets, State#state{parse_state = NParseState}};
|
||||||
{ok, Packet, Rest, NParseState} ->
|
{ok, Packet, Rest, NParseState} ->
|
||||||
NState = State#state{parse_state = NParseState},
|
NState = State#state{parse_state = NParseState},
|
||||||
parse_incoming(Rest, [Packet|Packets], NState)
|
parse_incoming(Rest, [Packet | Packets], NState)
|
||||||
catch
|
catch
|
||||||
throw : ?FRAME_PARSE_ERROR(Reason) ->
|
throw : ?FRAME_PARSE_ERROR(Reason) ->
|
||||||
?SLOG(info, #{ reason => Reason
|
?SLOG(info, #{ reason => Reason
|
||||||
|
|
|
@ -233,7 +233,7 @@ handle_cast({update_interval, Update = #update{name = Name}},
|
||||||
name => Name
|
name => Name
|
||||||
}),
|
}),
|
||||||
State;
|
State;
|
||||||
false -> State#state{updates = [Update|Updates]}
|
false -> State#state{updates = [Update | Updates]}
|
||||||
end,
|
end,
|
||||||
{noreply, NState};
|
{noreply, NState};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue