fix(emqx_lwm2m): limit the max size of cmd record
This commit is contained in:
parent
7aa08a82d5
commit
c9cbaeff95
|
@ -51,7 +51,8 @@
|
||||||
-type cmd_code_msg() :: binary().
|
-type cmd_code_msg() :: binary().
|
||||||
-type cmd_code_content() :: list(map()).
|
-type cmd_code_content() :: list(map()).
|
||||||
-type cmd_result() :: undefined | {cmd_code(), cmd_code_msg(), cmd_code_content()}.
|
-type cmd_result() :: undefined | {cmd_code(), cmd_code_msg(), cmd_code_content()}.
|
||||||
-type cmd_record() :: #{cmd_record_key() => cmd_result()}.
|
-type cmd_record() :: #{cmd_record_key() => cmd_result(),
|
||||||
|
queue := queue:queue()}.
|
||||||
|
|
||||||
-record(session, { coap :: emqx_coap_tm:manager()
|
-record(session, { coap :: emqx_coap_tm:manager()
|
||||||
, queue :: queue:queue(queued_request())
|
, queue :: queue:queue(queued_request())
|
||||||
|
@ -75,6 +76,8 @@
|
||||||
<<"7">>, <<"9">>, <<"15">>]).
|
<<"7">>, <<"9">>, <<"15">>]).
|
||||||
|
|
||||||
-define(CMD_KEY(Path, Type), {Path, Type}).
|
-define(CMD_KEY(Path, Type), {Path, Type}).
|
||||||
|
-define(MAX_RECORD_SIZE, 100).
|
||||||
|
-define(RECORD_SIZE(R), (erlang:map_size(R) - 1)).
|
||||||
|
|
||||||
%% uplink and downlink topic configuration
|
%% uplink and downlink topic configuration
|
||||||
-define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}).
|
-define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}).
|
||||||
|
@ -114,7 +117,7 @@ new() ->
|
||||||
, created_at = erlang:system_time(millisecond)
|
, created_at = erlang:system_time(millisecond)
|
||||||
, is_cache_mode = false
|
, is_cache_mode = false
|
||||||
, mountpoint = <<>>
|
, mountpoint = <<>>
|
||||||
, cmd_record = #{}
|
, cmd_record = #{queue => queue:new()}
|
||||||
, lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
|
, lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
|
||||||
|
|
||||||
-spec init(emqx_coap_message(), binary(), function(), session()) -> map().
|
-spec init(emqx_coap_message(), binary(), function(), session()) -> map().
|
||||||
|
@ -756,6 +759,17 @@ record_response(EventType, #{<<"data">> := Data}, Session) ->
|
||||||
Content = maps:get(<<"content">>, Data, undefined),
|
Content = maps:get(<<"content">>, Data, undefined),
|
||||||
record_cmd(ReqPath, EventType, {Code, CodeMsg, Content}, Session).
|
record_cmd(ReqPath, EventType, {Code, CodeMsg, Content}, Session).
|
||||||
|
|
||||||
record_cmd(Path, Type, Result, #session{cmd_record = Record} = Session) ->
|
record_cmd(Path, Type, Result, #session{cmd_record = #{queue := Queue} = Record} = Session) ->
|
||||||
Record2 = Record#{?CMD_KEY(Path, Type) => Result},
|
Key = ?CMD_KEY(Path, Type),
|
||||||
Session#session{cmd_record = Record2}.
|
Record2 = Record#{Key => Result},
|
||||||
|
Queue2 = queue:in(Key, Queue),
|
||||||
|
Record3 = check_record_size(Record2, Queue2),
|
||||||
|
Session#session{cmd_record = Record3}.
|
||||||
|
|
||||||
|
check_record_size(Record, Queue) when ?RECORD_SIZE(Record) =< ?MAX_RECORD_SIZE ->
|
||||||
|
Record#{queue := Queue};
|
||||||
|
|
||||||
|
check_record_size(Record, Queue) ->
|
||||||
|
{{value, Key}, Queue2} = queue:out(Queue),
|
||||||
|
Record2 = maps:remove(Key, Record),
|
||||||
|
Record2#{queue := Queue2}.
|
||||||
|
|
Loading…
Reference in New Issue