chore: fix elvis warnings
This commit is contained in:
parent
f194ae65d2
commit
2be33b33e3
|
@ -139,7 +139,7 @@ handle_call({subscribe, Topic, CoapPid}, _From, State=#state{sub_topics = TopicL
|
||||||
NewTopics = proplists:delete(Topic, TopicList),
|
NewTopics = proplists:delete(Topic, TopicList),
|
||||||
IsWild = emqx_topic:wildcard(Topic),
|
IsWild = emqx_topic:wildcard(Topic),
|
||||||
{reply, chann_subscribe(Topic, State), State#state{sub_topics =
|
{reply, chann_subscribe(Topic, State), State#state{sub_topics =
|
||||||
[{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate};
|
[{Topic, {IsWild, CoapPid}} | NewTopics]}, hibernate};
|
||||||
|
|
||||||
handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) ->
|
handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) ->
|
||||||
NewTopics = proplists:delete(Topic, TopicList),
|
NewTopics = proplists:delete(Topic, TopicList),
|
||||||
|
@ -281,7 +281,7 @@ do_deliver({Topic, Payload}, Subscribers) ->
|
||||||
|
|
||||||
deliver_to_coap(_TopicName, _Payload, []) ->
|
deliver_to_coap(_TopicName, _Payload, []) ->
|
||||||
ok;
|
ok;
|
||||||
deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) ->
|
deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) ->
|
||||||
Matched = case IsWild of
|
Matched = case IsWild of
|
||||||
true -> emqx_topic:match(TopicName, TopicFilter);
|
true -> emqx_topic:match(TopicName, TopicFilter);
|
||||||
false -> TopicName =:= TopicFilter
|
false -> TopicName =:= TopicFilter
|
||||||
|
|
|
@ -606,8 +606,6 @@ default_conninfo(ConnInfo) ->
|
||||||
ConnInfo#{clean_start => true,
|
ConnInfo#{clean_start => true,
|
||||||
clientid => undefined,
|
clientid => undefined,
|
||||||
username => undefined,
|
username => undefined,
|
||||||
proto_name => undefined,
|
|
||||||
proto_ver => undefined,
|
|
||||||
conn_props => #{},
|
conn_props => #{},
|
||||||
connected => true,
|
connected => true,
|
||||||
connected_at => erlang:system_time(millisecond),
|
connected_at => erlang:system_time(millisecond),
|
||||||
|
|
|
@ -74,7 +74,8 @@ call(Pid, Msg, Timeout) ->
|
||||||
Error -> {error, Error}
|
Error -> {error, Error}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
|
init(CoapPid, EndpointName, Peername = {_Peerhost, _Port},
|
||||||
|
RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
|
||||||
Mountpoint = proplists:get_value(mountpoint, lwm2m_coap_responder:options(), ""),
|
Mountpoint = proplists:get_value(mountpoint, lwm2m_coap_responder:options(), ""),
|
||||||
Lwm2mState = #lwm2m_state{peername = Peername,
|
Lwm2mState = #lwm2m_state{peername = Peername,
|
||||||
endpoint_name = EndpointName,
|
endpoint_name = EndpointName,
|
||||||
|
@ -103,9 +104,10 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
|
||||||
emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
|
emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
|
||||||
end),
|
end),
|
||||||
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
|
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
|
||||||
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
|
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
|
||||||
|
|
||||||
{ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
|
NTimer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired}),
|
||||||
|
{ok, Lwm2mState1#lwm2m_state{life_timer = NTimer}};
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
_ = run_hooks('client.connack', [conninfo(Lwm2mState), not_authorized], undefined),
|
_ = run_hooks('client.connack', [conninfo(Lwm2mState), not_authorized], undefined),
|
||||||
{error, Error}
|
{error, Error}
|
||||||
|
@ -133,7 +135,7 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi
|
||||||
%% - report the registration info update, but only when objectList is updated.
|
%% - report the registration info update, but only when objectList is updated.
|
||||||
case NewRegInfo of
|
case NewRegInfo of
|
||||||
#{<<"objectList">> := _} ->
|
#{<<"objectList">> := _} ->
|
||||||
emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
|
emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
|
||||||
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
|
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end
|
end
|
||||||
|
@ -186,7 +188,8 @@ deliver(#message{topic = Topic, payload = Payload},
|
||||||
started_at = StartedAt,
|
started_at = StartedAt,
|
||||||
endpoint_name = EndpointName}) ->
|
endpoint_name = EndpointName}) ->
|
||||||
IsCacheMode = is_cache_mode(RegInfo, StartedAt),
|
IsCacheMode = is_cache_mode(RegInfo, StartedAt),
|
||||||
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
|
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, "
|
||||||
|
"Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
|
||||||
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
||||||
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
|
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
|
||||||
Lwm2mState.
|
Lwm2mState.
|
||||||
|
@ -256,7 +259,8 @@ time_now() -> erlang:system_time(millisecond).
|
||||||
%% Deliver downlink message to coap
|
%% Deliver downlink message to coap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
|
deliver_to_coap(AlternatePath, JsonData,
|
||||||
|
CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
|
||||||
try
|
try
|
||||||
TermData = emqx_json:decode(JsonData, [return_maps]),
|
TermData = emqx_json:decode(JsonData, [return_maps]),
|
||||||
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
|
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
|
||||||
|
@ -285,7 +289,8 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when
|
||||||
send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
|
send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
|
||||||
do_send_to_broker(EventType, Payload, Lwm2mState).
|
do_send_to_broker(EventType, Payload, Lwm2mState).
|
||||||
|
|
||||||
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
|
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload,
|
||||||
|
#lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
|
||||||
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
|
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
|
||||||
Code = maps:get(<<"code">>, Data, undefined),
|
Code = maps:get(<<"code">>, Data, undefined),
|
||||||
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
|
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
|
||||||
|
@ -327,18 +332,27 @@ auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
|
||||||
|
|
||||||
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
|
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
|
||||||
lists:foreach(fun(ObjectPath) ->
|
lists:foreach(fun(ObjectPath) ->
|
||||||
[ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
|
[ObjId | LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
|
||||||
case ObjId of
|
case ObjId of
|
||||||
<<"19">> ->
|
<<"19">> ->
|
||||||
[ObjInsId | _LastPath1] = LastPath,
|
[ObjInsId | _LastPath1] = LastPath,
|
||||||
case ObjInsId of
|
case ObjInsId of
|
||||||
<<"0">> ->
|
<<"0">> ->
|
||||||
observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
|
observe_object_slowly(
|
||||||
|
AlternatePath, <<"/19/0/0">>,
|
||||||
|
CoapPid, 100, EndpointName
|
||||||
|
);
|
||||||
_ ->
|
_ ->
|
||||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
|
observe_object_slowly(
|
||||||
|
AlternatePath, ObjectPath,
|
||||||
|
CoapPid, 100, EndpointName
|
||||||
|
)
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
|
observe_object_slowly(
|
||||||
|
AlternatePath, ObjectPath,
|
||||||
|
CoapPid, 100, EndpointName
|
||||||
|
)
|
||||||
end
|
end
|
||||||
end, ObjectList).
|
end, ObjectList).
|
||||||
|
|
||||||
|
@ -392,11 +406,12 @@ get_cached_downlink_messages() ->
|
||||||
is_cache_mode(RegInfo, StartedAt) ->
|
is_cache_mode(RegInfo, StartedAt) ->
|
||||||
case is_psm(RegInfo) orelse is_qmode(RegInfo) of
|
case is_psm(RegInfo) orelse is_qmode(RegInfo) of
|
||||||
true ->
|
true ->
|
||||||
QModeTimeWind = proplists:get_value(qmode_time_window, lwm2m_coap_responder:options(), 22),
|
QModeTimeWind = proplists:get_value(
|
||||||
Now = time_now(),
|
qmode_time_window,
|
||||||
if (Now - StartedAt) >= QModeTimeWind -> true;
|
lwm2m_coap_responder:options(),
|
||||||
true -> false
|
22
|
||||||
end;
|
),
|
||||||
|
(time_now() - StartedAt) >= QModeTimeWind;
|
||||||
false -> false
|
false -> false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -108,6 +108,8 @@
|
||||||
, init/2
|
, init/2
|
||||||
]}).
|
]}).
|
||||||
|
|
||||||
|
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
|
||||||
|
|
||||||
-type(pstate() :: #pstate{}).
|
-type(pstate() :: #pstate{}).
|
||||||
|
|
||||||
%% @doc Init protocol
|
%% @doc Init protocol
|
||||||
|
@ -132,8 +134,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
|
||||||
|
|
||||||
AllowAnonymous = get_value(allow_anonymous, Opts, false),
|
AllowAnonymous = get_value(allow_anonymous, Opts, false),
|
||||||
DefaultUser = get_value(default_user, Opts),
|
DefaultUser = get_value(default_user, Opts),
|
||||||
|
#pstate{
|
||||||
#pstate{
|
|
||||||
conninfo = NConnInfo,
|
conninfo = NConnInfo,
|
||||||
clientinfo = ClientInfo,
|
clientinfo = ClientInfo,
|
||||||
heartfun = HeartFun,
|
heartfun = HeartFun,
|
||||||
|
@ -165,7 +166,7 @@ default_conninfo(ConnInfo) ->
|
||||||
info(State) ->
|
info(State) ->
|
||||||
maps:from_list(info(?INFO_KEYS, State)).
|
maps:from_list(info(?INFO_KEYS, State)).
|
||||||
|
|
||||||
-spec info(list(atom())|atom(), pstate()) -> term().
|
-spec info(list(atom()) | atom(), pstate()) -> term().
|
||||||
info(Keys, State) when is_list(Keys) ->
|
info(Keys, State) when is_list(Keys) ->
|
||||||
[{Key, info(Key, State)} || Key <- Keys];
|
[{Key, info(Key, State)} || Key <- Keys];
|
||||||
info(conninfo, #pstate{conninfo = ConnInfo}) ->
|
info(conninfo, #pstate{conninfo = ConnInfo}) ->
|
||||||
|
@ -288,7 +289,12 @@ received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true
|
||||||
received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) ->
|
received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) ->
|
||||||
case header(<<"transaction">>, Headers) of
|
case header(<<"transaction">>, Headers) of
|
||||||
undefined -> {ok, handle_recv_send_frame(Frame, State)};
|
undefined -> {ok, handle_recv_send_frame(Frame, State)};
|
||||||
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, receipt_id(Headers), State)
|
TransactionId ->
|
||||||
|
add_action(TransactionId,
|
||||||
|
{fun ?MODULE:handle_recv_send_frame/2, [Frame]},
|
||||||
|
receipt_id(Headers),
|
||||||
|
State
|
||||||
|
)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
|
received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
|
||||||
|
@ -346,7 +352,11 @@ received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
|
||||||
received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
||||||
case header(<<"transaction">>, Headers) of
|
case header(<<"transaction">>, Headers) of
|
||||||
undefined -> {ok, handle_recv_ack_frame(Frame, State)};
|
undefined -> {ok, handle_recv_ack_frame(Frame, State)};
|
||||||
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, receipt_id(Headers), State)
|
TransactionId ->
|
||||||
|
add_action(TransactionId,
|
||||||
|
{fun ?MODULE:handle_recv_ack_frame/2, [Frame]},
|
||||||
|
receipt_id(Headers),
|
||||||
|
State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% NACK
|
%% NACK
|
||||||
|
@ -357,7 +367,11 @@ received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
||||||
received(Frame = #stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
|
received(Frame = #stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
|
||||||
case header(<<"transaction">>, Headers) of
|
case header(<<"transaction">>, Headers) of
|
||||||
undefined -> {ok, handle_recv_nack_frame(Frame, State)};
|
undefined -> {ok, handle_recv_nack_frame(Frame, State)};
|
||||||
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, receipt_id(Headers), State)
|
TransactionId ->
|
||||||
|
add_action(TransactionId,
|
||||||
|
{fun ?MODULE:handle_recv_nack_frame/2, [Frame]},
|
||||||
|
receipt_id(Headers),
|
||||||
|
State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% BEGIN
|
%% BEGIN
|
||||||
|
@ -516,9 +530,9 @@ negotiate_version(Accepts) ->
|
||||||
|
|
||||||
negotiate_version(Ver, []) ->
|
negotiate_version(Ver, []) ->
|
||||||
{error, <<"Supported protocol versions < ", Ver/binary>>};
|
{error, <<"Supported protocol versions < ", Ver/binary>>};
|
||||||
negotiate_version(Ver, [AcceptVer|_]) when Ver >= AcceptVer ->
|
negotiate_version(Ver, [AcceptVer | _]) when Ver >= AcceptVer ->
|
||||||
{ok, AcceptVer};
|
{ok, AcceptVer};
|
||||||
negotiate_version(Ver, [_|T]) ->
|
negotiate_version(Ver, [_ | T]) ->
|
||||||
negotiate_version(Ver, T).
|
negotiate_version(Ver, T).
|
||||||
|
|
||||||
check_login(Login, _, AllowAnonymous, _)
|
check_login(Login, _, AllowAnonymous, _)
|
||||||
|
@ -537,7 +551,7 @@ check_login(Login, Passcode, _, DefaultUser) ->
|
||||||
add_action(Id, Action, ReceiptId, State = #pstate{transaction = Trans}) ->
|
add_action(Id, Action, ReceiptId, State = #pstate{transaction = Trans}) ->
|
||||||
case maps:get(Id, Trans, undefined) of
|
case maps:get(Id, Trans, undefined) of
|
||||||
{Ts, Actions} ->
|
{Ts, Actions} ->
|
||||||
NTrans = Trans#{Id => {Ts, [Action|Actions]}},
|
NTrans = Trans#{Id => {Ts, [Action | Actions]}},
|
||||||
{ok, State#pstate{transaction = NTrans}};
|
{ok, State#pstate{transaction = NTrans}};
|
||||||
_ ->
|
_ ->
|
||||||
send(error_frame(ReceiptId, ["Transaction ", Id, " not found"]), State)
|
send(error_frame(ReceiptId, ["Transaction ", Id, " not found"]), State)
|
||||||
|
@ -713,7 +727,7 @@ find_sub_by_id(Id, Subs) ->
|
||||||
end, Subs),
|
end, Subs),
|
||||||
case maps:to_list(Found) of
|
case maps:to_list(Found) of
|
||||||
[] -> undefined;
|
[] -> undefined;
|
||||||
[Sub|_] -> Sub
|
[Sub | _] -> Sub
|
||||||
end.
|
end.
|
||||||
|
|
||||||
is_acl_enabled(_) ->
|
is_acl_enabled(_) ->
|
||||||
|
|
Loading…
Reference in New Issue