diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index af3ac3b2c..84281a500 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -103,7 +103,7 @@ -type(reply() :: {outgoing, emqx_types:packet()} | {outgoing, [emqx_types:packet()]} - | {event, conn_state()|updated} + | {event, conn_state() | updated} | {close, Reason :: atom()}). -type(replies() :: emqx_types:packet() | reply() | [reply()]). @@ -132,7 +132,7 @@ info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). --spec(info(list(atom())|atom(), channel()) -> term()). +-spec(info(list(atom()) | atom(), channel()) -> term()). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; info(conninfo, #channel{conninfo = ConnInfo}) -> @@ -328,17 +328,23 @@ handle_in(Packet = ?AUTH_PACKET(ReasonCode, _Properties), connecting -> process_connect(NProperties, ensure_connected(NChannel)); _ -> - handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel#channel{conn_state = connected}) + handle_out( auth + , {?RC_SUCCESS, NProperties} + , NChannel#channel{conn_state = connected} + ) end; {continue, NProperties, NChannel} -> - handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel#channel{conn_state = reauthenticating}); + handle_out( auth + , {?RC_CONTINUE_AUTHENTICATION, NProperties} + , NChannel#channel{conn_state = reauthenticating} + ); {error, NReasonCode} -> case ConnState of connecting -> handle_out(connack, NReasonCode, Channel); _ -> handle_out(disconnect, NReasonCode, Channel) - end + end end catch _Class:_Reason -> @@ -632,7 +638,7 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, ?SLOG(warning, #{ msg => "dropped_qos2_packet", reason => emqx_reason_codes:name(RC), - packetId => PacketId + packet_id => PacketId }), ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(pubrec, {PacketId, RC}, Channel) @@ -655,7 +661,7 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) -> -compile({inline, [puback_reason_code/1]}). puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; -puback_reason_code([_|_]) -> ?RC_SUCCESS. +puback_reason_code([_ | _]) -> ?RC_SUCCESS. -compile({inline, [after_message_acked/3]}). after_message_acked(ClientInfo, Msg, PubAckProps) -> @@ -674,7 +680,7 @@ process_subscribe(TopicFilters, SubProps, Channel) -> process_subscribe([], _SubProps, Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_subscribe([Topic = {TopicFilter, SubOpts}|More], SubProps, Channel, Acc) -> +process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Acc) -> case check_sub_caps(TopicFilter, SubOpts, Channel) of ok -> {ReasonCode, NChannel} = do_subscribe(TopicFilter, @@ -716,9 +722,9 @@ process_unsubscribe(TopicFilters, UnSubProps, Channel) -> process_unsubscribe([], _UnSubProps, Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_unsubscribe([{TopicFilter, SubOpts}|More], UnSubProps, Channel, Acc) -> +process_unsubscribe([{TopicFilter, SubOpts} | More], UnSubProps, Channel, Acc) -> {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel), - process_unsubscribe(More, UnSubProps, NChannel, [RC|Acc]). + process_unsubscribe(More, UnSubProps, NChannel, [RC | Acc]). do_unsubscribe(TopicFilter, SubOpts, Channel = #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, @@ -790,7 +796,9 @@ handle_deliver(Delivers, Channel = #channel{takeover = true, pendings = Pendings, session = Session, clientinfo = #{clientid := ClientId}}) -> - NPendings = lists:append(Pendings, emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), + NPendings = lists:append( + Pendings, + emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), {ok, Channel#channel{pendings = NPendings}}; handle_deliver(Delivers, Channel = #channel{session = Session, @@ -1365,17 +1373,20 @@ authenticate(?AUTH_PACKET(_, #{'Authentication-Method' := AuthMethod} = Properti {error, ?RC_BAD_AUTHENTICATION_METHOD} end. -do_authenticate(#{auth_method := AuthMethod} = Credential, #channel{clientinfo = ClientInfo} = Channel) -> +do_authenticate(#{auth_method := AuthMethod} = Credential, + #channel{clientinfo = ClientInfo} = Channel) -> Properties = #{'Authentication-Method' => AuthMethod}, case emqx_access_control:authenticate(Credential) of {ok, Result} -> {ok, Properties, - Channel#channel{clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)}, - auth_cache = #{}}}; + Channel#channel{ + clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)}, + auth_cache = #{}}}; {ok, Result, AuthData} -> {ok, Properties#{'Authentication-Data' => AuthData}, - Channel#channel{clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)}, - auth_cache = #{}}}; + Channel#channel{ + clientinfo = ClientInfo#{is_superuser => maps:get(is_superuser, Result, false)}, + auth_cache = #{}}}; {continue, AuthCache} -> {continue, Properties, Channel#channel{auth_cache = AuthCache}}; {continue, AuthData, AuthCache} -> diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index d4704c43a..1abd5c151 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -102,6 +102,11 @@ -define(T_TAKEOVER, 15000). +%% linting overrides +-elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}} + , {elvis_style, god_modules, #{ignore => [emqx_cm]}} + ]). + %% @doc Start the channel manager. -spec(start_link() -> startlink_ret()). start_link() -> @@ -245,7 +250,10 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> pendings => Pendings}}; {living, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), + Session1 = emqx_persistent_session:persist( ClientInfo + , ConnInfo + , Session + ), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session1, @@ -254,12 +262,18 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> {expired, OldSession} -> _ = emqx_persistent_session:discard(ClientId, OldSession), Session = create_session(ClientInfo, ConnInfo), - Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), + Session1 = emqx_persistent_session:persist( ClientInfo + , ConnInfo + , Session + ), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session1, present => false}}; none -> Session = create_session(ClientInfo, ConnInfo), - Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), + Session1 = emqx_persistent_session:persist( ClientInfo + , ConnInfo + , Session + ), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session1, present => false}} end @@ -309,7 +323,7 @@ takeover_session(ClientId) -> [ChanPid] -> takeover_session(ClientId, ChanPid); ChanPids -> - [ChanPid|StalePids] = lists:reverse(ChanPids), + [ChanPid | StalePids] = lists:reverse(ChanPids), ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) @@ -374,7 +388,7 @@ kick_session(ClientId) -> [ChanPid] -> kick_session(ClientId, ChanPid); ChanPids -> - [ChanPid|StalePids] = lists:reverse(ChanPids), + [ChanPid | StalePids] = lists:reverse(ChanPids), ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 52caf121b..e4a617675 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -149,7 +149,7 @@ start_link(Transport, Socket, Options) -> %%-------------------------------------------------------------------- %% @doc Get infos of the connection/channel. --spec(info(pid()|state()) -> emqx_types:infos()). +-spec(info(pid() | state()) -> emqx_types:infos()). info(CPid) when is_pid(CPid) -> call(CPid, info); info(State = #state{channel = Channel}) -> @@ -176,7 +176,7 @@ info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter). %% @doc Get stats of the connection/channel. --spec(stats(pid()|state()) -> emqx_types:stats()). +-spec(stats(pid() | state()) -> emqx_types:stats()). stats(CPid) when is_pid(CPid) -> call(CPid, stats); stats(#state{transport = Transport, @@ -373,7 +373,7 @@ cancel_stats_timer(State) -> State. process_msg([], State) -> {ok, State}; -process_msg([Msg|More], State) -> +process_msg([Msg | More], State) -> try case handle_msg(Msg, State) of ok -> @@ -475,7 +475,7 @@ handle_msg({Passive, _Sock}, State) handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{ listener = {Type, Listener}} = State) -> ActiveN = get_active_n(Type, Listener), - Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], + Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent @@ -649,7 +649,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> {Packets, State#state{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> NState = State#state{parse_state = NParseState}, - parse_incoming(Rest, [Packet|Packets], NState) + parse_incoming(Rest, [Packet | Packets], NState) catch throw : ?FRAME_PARSE_ERROR(Reason) -> ?SLOG(info, #{ reason => Reason diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index 87f50f92f..74411ee9c 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -233,7 +233,7 @@ handle_cast({update_interval, Update = #update{name = Name}}, name => Name }), State; - false -> State#state{updates = [Update|Updates]} + false -> State#state{updates = [Update | Updates]} end, {noreply, NState};