diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 78617c317..1b84a5209 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -245,8 +245,6 @@ filter_out_low_level_opts(Type, RawCfg = #{gw_conf := Conf0}) when ?IS_ESOCKD_LI acceptors, max_connections, max_conn_rate, - proxy_protocol, - proxy_protocol_timeout, tcp_options, ssl_options, udp_options, @@ -261,13 +259,10 @@ filter_out_low_level_opts(Type, RawCfg = #{gw_conf := Conf0}) when ?IS_COWBOY_LI acceptors, max_connections, max_conn_rate, - proxy_protocol, - proxy_protocol_timeout, tcp_options, ssl_options, udp_options, - dtls_options, - websocket + dtls_options ], Conf1 = maps:without(CowboyKeys, RawCfg), maps:merge(Conf0, Conf1). @@ -536,7 +531,7 @@ ranch_opts(Type, ListenOn, Opts) -> ws_opts(Opts, Conf) -> ConnMod = maps:get(connection_mod, Conf, emqx_gateway_conn), WsPaths = [ - {emqx_utils_maps:deep_get([websocket, path], Opts, "/"), ConnMod, Conf} + {emqx_utils_maps:deep_get([websocket, path], Opts, "") ++ "/[...]", ConnMod, Conf} ], Dispatch = cowboy_router:compile([{'_', WsPaths}]), ProxyProto = maps:get(proxy_protocol, Opts, false), diff --git a/apps/emqx_gateway_ocpp/BSL.txt b/apps/emqx_gateway_ocpp/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_gateway_ocpp/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl index 53c3a22ef..30b1cab3c 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl @@ -63,7 +63,7 @@ %% ClientInfo clientinfo :: emqx_types:clientinfo(), %% Session - session :: maybe(emqx_session:session()), + session :: maybe(map()), %% ClientInfo override specs clientinfo_override :: map(), %% Keepalive @@ -163,18 +163,41 @@ info(clientid, #channel{clientinfo = ClientInfo}) -> maps:get(clientid, ClientInfo, undefined); info(username, #channel{clientinfo = ClientInfo}) -> maps:get(username, ClientInfo, undefined); -info(session, #channel{session = Session}) -> - emqx_utils:maybe_apply(fun emqx_session:info/1, Session); +info(session, #channel{conninfo = ConnInfo}) -> + %% XXX: + #{ + created_at => maps:get(connected_at, ConnInfo, undefined), + is_persistent => false, + subscriptions => #{}, + upgrade_qos => false, + retry_interval => 0, + await_rel_timeout => 0 + }; info(conn_state, #channel{conn_state = ConnState}) -> ConnState; info(keepalive, #channel{keepalive = Keepalive}) -> emqx_utils:maybe_apply(fun emqx_ocpp_keepalive:info/1, Keepalive); +info(ctx, #channel{ctx = Ctx}) -> + Ctx; info(timers, #channel{timers = Timers}) -> Timers. -spec stats(channel()) -> emqx_types:stats(). -stats(#channel{session = Session}) -> - lists:append(emqx_session:stats(Session), emqx_pd:get_counters(?CHANNEL_METRICS)). +stats(#channel{mqueue = MQueue}) -> + %% XXX: + SessionStats = [ + {subscriptions_cnt, 0}, + {subscriptions_max, 0}, + {inflight_cnt, 0}, + {inflight_max, 0}, + {mqueue_len, queue:len(MQueue)}, + {mqueue_max, queue:len(MQueue)}, + {mqueue_dropped, 0}, + {next_pkt_id, 0}, + {awaiting_rel_cnt, 0}, + {awaiting_rel_max, 0} + ], + lists:append(SessionStats, emqx_pd:get_counters(?CHANNEL_METRICS)). %%-------------------------------------------------------------------- %% Init the channel @@ -300,9 +323,9 @@ enrich_client( fix_mountpoint(ClientInfo = #{mountpoint := undefined}) -> ClientInfo; -fix_mountpoint(ClientInfo = #{mountpoint := MountPoint}) -> - MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo), - ClientInfo#{mountpoint := MountPoint1}. +fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) -> + Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), + ClientInfo#{mountpoint := Mountpoint1}. set_log_meta(#channel{ clientinfo = #{clientid := ClientId}, @@ -353,14 +376,16 @@ publish( clientid := ClientId, username := Username, protocol := Protocol, - peerhost := PeerHost + peerhost := PeerHost, + mountpoint := Mountpoint }, conninfo = #{proto_ver := ProtoVer} } ) when is_map(Frame) -> - Topic = upstream_topic(Frame, Channel), + Topic0 = upstream_topic(Frame, Channel), + Topic = emqx_mountpoint:mount(Mountpoint, Topic0), Payload = frame2payload(Frame), emqx_broker:publish( emqx_message:make( @@ -386,14 +411,14 @@ upstream_topic( case Type of ?OCPP_MSG_TYPE_ID_CALL -> Action = maps:get(action, Frame), - emqx_placeholder:proc_tmpl( + proc_tmpl( emqx_ocpp_conf:uptopic(Action), Vars#{action => Action} ); ?OCPP_MSG_TYPE_ID_CALLRESULT -> - emqx_placeholder:proc_tmpl(emqx_ocpp_conf:up_reply_topic(), Vars); + proc_tmpl(emqx_ocpp_conf:up_reply_topic(), Vars); ?OCPP_MSG_TYPE_ID_CALLERROR -> - emqx_placeholder:proc_tmpl(emqx_ocpp_conf:up_error_topic(), Vars) + proc_tmpl(emqx_ocpp_conf:up_error_topic(), Vars) end. %%-------------------------------------------------------------------- @@ -589,27 +614,20 @@ process_connect( end. ensure_subscribe_dn_topics( - Channel = #channel{ - clientinfo = #{clientid := ClientId} = ClientInfo, - session = Session - } + Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint} = ClientInfo} ) -> - TopicTokens = emqx_ocpp_conf:dntopic(), SubOpts = #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1}, - Topic = emqx_placeholder:proc_tmpl( - TopicTokens, + Topic0 = proc_tmpl( + emqx_ocpp_conf:dntopic(), #{ clientid => ClientId, cid => ClientId } ), - {ok, NSession} = emqx_session:subscribe( - ClientInfo, - Topic, - SubOpts, - Session - ), - Channel#channel{session = NSession}. + Topic = emqx_mountpoint:mount(Mountpoint, Topic0), + ok = emqx_broker:subscribe(Topic, ClientId, SubOpts), + ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), + Channel. %%-------------------------------------------------------------------- %% Handle timeout @@ -694,10 +712,8 @@ terminate({shutdown, Reason}, Channel) when terminate(Reason, Channel) -> run_terminate_hook(Reason, Channel). -run_terminate_hook(_Reason, #channel{session = undefined}) -> - ok; -run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) -> - emqx_session:terminate(ClientInfo, Reason, Session). +run_terminate_hook(Reason, Channel = #channel{clientinfo = ClientInfo}) -> + emqx_hooks:run('session.terminated', [ClientInfo, Reason, info(session, Channel)]). %%-------------------------------------------------------------------- %% Internal functions @@ -795,12 +811,11 @@ ensure_disconnected( Reason, Channel = #channel{ conninfo = ConnInfo, - clientinfo = ClientInfo = #{clientid := ClientId} + clientinfo = ClientInfo } ) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), - emqx_cm:unregister_channel(ClientId), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. %%-------------------------------------------------------------------- @@ -865,6 +880,10 @@ shutdown(success, Reply, Channel) -> shutdown(Reason, Reply, Channel) -> {shutdown, Reason, Reply, Channel}. +proc_tmpl(Tmpl, Vars) -> + Tokens = emqx_placeholder:preproc_tmpl(Tmpl), + emqx_placeholder:proc_tmpl(Tokens, Vars). + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_conf.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_conf.erl index 34e8ba763..1151e1dbb 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_conf.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_conf.erl @@ -17,18 +17,10 @@ %% Conf modules for emqx-ocpp gateway -module(emqx_ocpp_conf). --export([ - load/1, - unload/0, - get_env/1, - get_env/2 -]). - -export([ default_heartbeat_interval/0, heartbeat_checking_times_backoff/0, retry_interval/0, - awaiting_timeout/0, message_format_checking/0, max_mqueue_len/0, strit_mode/1, @@ -38,29 +30,18 @@ dntopic/0 ]). --define(KEY(Key), {?MODULE, Key}). +-define(KEY(K), [gateway, ocpp, K]). -load(Confs) -> - lists:foreach(fun({K, V}) -> store(K, V) end, Confs). - -get_env(K) -> - get_env(K, undefined). - -get_env(K, Default) -> - try - persistent_term:get(?KEY(K)) - catch - error:badarg -> - Default - end. +conf(K, Default) -> + emqx_config:get(?KEY(K), Default). -spec default_heartbeat_interval() -> pos_integer(). default_heartbeat_interval() -> - get_env(default_heartbeat_interval, 600). + conf(default_heartbeat_interval, 600). -spec heartbeat_checking_times_backoff() -> pos_integer(). heartbeat_checking_times_backoff() -> - get_env(heartbeat_checking_times_backoff, 1). + conf(heartbeat_checking_times_backoff, 1). -spec strit_mode(upstream | dnstream) -> boolean(). strit_mode(dnstream) -> @@ -76,21 +57,17 @@ retry_interval() -> max_mqueue_len() -> dnstream(max_mqueue_len, 10). --spec awaiting_timeout() -> pos_integer(). -awaiting_timeout() -> - upstream(awaiting_timeout, 30). - -spec message_format_checking() -> all | upstream_only | dnstream_only | disable. message_format_checking() -> - get_env(message_format_checking, all). + conf(message_format_checking, all). uptopic(Action) -> Topic = upstream(topic), - Mapping = upstream(mapping, #{}), + Mapping = upstream(topic_override_mapping, #{}), maps:get(Action, Mapping, Topic). up_reply_topic() -> @@ -102,16 +79,6 @@ up_error_topic() -> dntopic() -> dnstream(topic). --spec unload() -> ok. -unload() -> - lists:foreach( - fun - ({?KEY(K), _}) -> persistent_term:erase(?KEY(K)); - (_) -> ok - end, - persistent_term:get() - ). - %%-------------------------------------------------------------------- %% internal funcs %%-------------------------------------------------------------------- @@ -120,34 +87,10 @@ dnstream(K) -> dnstream(K, undefined). dnstream(K, Def) -> - L = get_env(dnstream, []), - proplists:get_value(K, L, Def). + emqx_config:get([gateway, ocpp, dnstream, K], Def). upstream(K) -> upstream(K, undefined). upstream(K, Def) -> - L = get_env(upstream, []), - proplists:get_value(K, L, Def). - -store(upstream, L) -> - L1 = preproc([topic, reply_topic, error_topic], L), - Mapping = proplists:get_value(mapping, L1, #{}), - NMappings = maps:map( - fun(_, V) -> emqx_placeholder:preproc_tmpl(V) end, - Mapping - ), - L2 = lists:keyreplace(mapping, 1, L1, {mapping, NMappings}), - persistent_term:put(?KEY(upstream), L2); -store(dnstream, L) -> - L1 = preproc([topic], L), - persistent_term:put(?KEY(dnstream), L1); -store(K, V) -> - persistent_term:put(?KEY(K), V). - -preproc([], L) -> - L; -preproc([Key | More], L) -> - Val0 = proplists:get_value(Key, L), - Val = emqx_placeholder:preproc_tmpl(Val0), - preproc(More, lists:keyreplace(Key, 1, L, {Key, Val})). + emqx_config:get([gateway, ocpp, upstream, K], Def). diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl index 2ebca7408..51389f6e4 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl @@ -204,13 +204,13 @@ call(WsPid, Req, Timeout) when is_pid(WsPid) -> init(Req, Opts) -> %% WS Transport Idle Timeout - IdleTimeout = proplists:get_value(idle_timeout, Opts, 7200000), + IdleTimeout = maps:get(idle_timeout, Opts, 7200000), MaxFrameSize = - case proplists:get_value(max_frame_size, Opts, 0) of + case maps:get(max_frame_size, Opts, 0) of 0 -> infinity; I -> I end, - Compress = proplists:get_bool(compress, Opts), + Compress = emqx_utils_maps:deep_get([websocket, compress], Opts), WsOpts = #{ compress => Compress, max_frame_size => MaxFrameSize, @@ -270,7 +270,7 @@ init_state_and_channel([Req, Opts, _WsOpts], _State = undefined) -> }, Limiter = undeined, ActiveN = emqx_gateway_utils:active_n(Opts), - Piggyback = proplists:get_value(piggyback, Opts, multiple), + Piggyback = emqx_utils_maps:deep_get([websocket, piggyback], Opts, multiple), ParseState = emqx_ocpp_frame:initial_parse_state(#{}), Serialize = emqx_ocpp_frame:serialize_opts(), Channel = emqx_ocpp_channel:init(ConnInfo, Opts), @@ -303,7 +303,7 @@ init_state_and_channel([Req, Opts, _WsOpts], _State = undefined) -> peername_and_cert(Req, Opts) -> case - proplists:get_bool(proxy_protocol, Opts) andalso + maps:get(proxy_protocol, Opts, false) andalso maps:get(proxy_header, Req) of #{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} -> @@ -323,8 +323,8 @@ peername_and_cert(Req, Opts) -> end. parse_sec_websocket_protocol([Req, Opts, WsOpts], State) -> - SupportedSubprotocols = proplists:get_value(supported_subprotocols, Opts), - FailIfNoSubprotocol = proplists:get_value(fail_if_no_subprotocol, Opts), + SupportedSubprotocols = emqx_utils_maps:deep_get([websocket, supported_subprotocols], Opts), + FailIfNoSubprotocol = emqx_utils_maps:deep_get([websocket, fail_if_no_subprotocol], Opts), case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of undefined -> case FailIfNoSubprotocol of @@ -402,7 +402,7 @@ auth_connect([Req, Opts, _WsOpts], State = #state{channel = Channel}) -> end. parse_clientid(Req, Opts) -> - PathPrefix = proplists:get_value(ocpp_path, Opts), + PathPrefix = emqx_utils_maps:deep_get([websocket, path], Opts), [_, ClientId0] = binary:split( cowboy_req:path(Req), iolist_to_binary(PathPrefix ++ "/") @@ -426,12 +426,12 @@ parse_protocol_name(<<"ocpp1.6">>) -> parse_header_fun_origin(Req, Opts) -> case cowboy_req:header(<<"origin">>, Req) of undefined -> - case proplists:get_bool(allow_origin_absence, Opts) of + case emqx_utils_maps:deep_get([websocket, allow_origin_absence], Opts) of true -> ok; false -> {error, origin_header_cannot_be_absent} end; Value -> - Origins = proplists:get_value(check_origins, Opts, []), + Origins = emqx_utils_maps:deep_get([websocket, check_origins], Opts, []), case lists:member(Value, Origins) of true -> ok; false -> {error, {origin_not_allowed, Value}} @@ -439,7 +439,7 @@ parse_header_fun_origin(Req, Opts) -> end. check_origin_header(Req, Opts) -> - case proplists:get_bool(check_origin_enable, Opts) of + case emqx_utils_maps:deep_get([websocket, check_origin_enable], Opts) of true -> parse_header_fun_origin(Req, Opts); false -> ok end. @@ -547,9 +547,15 @@ handle_info({connack, ConnAck}, State) -> handle_info({close, Reason}, State) -> ?SLOG(debug, #{msg => "force_to_close_socket", reason => Reason}), return(enqueue({close, Reason}, State)); -handle_info({event, connected}, State = #state{channel = Channel}) -> - ClientId = emqx_ocpp_channel:info(clientid, Channel), - emqx_cm:insert_channel_info(ClientId, info(State), stats(State)), +handle_info({event, connected}, State = #state{chann_mod = ChannMod, channel = Channel}) -> + Ctx = ChannMod:info(ctx, Channel), + ClientId = ChannMod:info(clientid, Channel), + emqx_gateway_ctx:insert_channel_info( + Ctx, + ClientId, + info(State), + stats(State) + ), return(State); handle_info({event, disconnected}, State = #state{chann_mod = ChannMod, channel = Channel}) -> Ctx = ChannMod:info(ctx, Channel), @@ -577,12 +583,14 @@ handle_timeout( TRef, emit_stats, State = #state{ + chann_mod = ChannMod, channel = Channel, stats_timer = TRef } ) -> - ClientId = emqx_ocpp_channel:info(clientid, Channel), - emqx_cm:set_chan_stats(ClientId, stats(State)), + Ctx = ChannMod:info(ctx, Channel), + ClientId = ChannMod:info(clientid, Channel), + emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)), return(State#state{stats_timer = undefined}); handle_timeout(TRef, TMsg, State) -> with_channel(handle_timeout, [TRef, TMsg], State). @@ -852,7 +860,9 @@ trigger(Event) -> erlang:send(self(), Event). get_peer(Req, Opts) -> {PeerAddr, PeerPort} = cowboy_req:peer(Req), - AddrHeader = cowboy_req:header(proplists:get_value(proxy_address_header, Opts), Req, <<>>), + AddrHeader = cowboy_req:header( + emqx_utils_maps:deep_get([websocket, proxy_address_header], Opts), Req, <<>> + ), ClientAddr = case string:tokens(binary_to_list(AddrHeader), ", ") of [] -> @@ -867,7 +877,9 @@ get_peer(Req, Opts) -> _ -> PeerAddr end, - PortHeader = cowboy_req:header(proplists:get_value(proxy_port_header, Opts), Req, <<>>), + PortHeader = cowboy_req:header( + emqx_utils_maps:deep_get([websocket, proxy_port_header], Opts), Req, <<>> + ), ClientPort = case string:tokens(binary_to_list(PortHeader), ", ") of [] -> diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_schemas.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_schemas.erl index fc5cc32e7..e2bd00d0e 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_schemas.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_schemas.erl @@ -32,7 +32,7 @@ load() -> disable -> ok; _ -> - case feedvar(emqx_ocpp_conf:get_env(json_schema_dir)) of + case feedvar(emqx_config:get([gateway, ocpp, json_schema_dir])) of undefined -> ok; Dir -> @@ -94,11 +94,11 @@ feedvar(Path) -> ). schema_id(?OCPP_MSG_TYPE_ID_CALL, Action) when is_binary(Action) -> - emqx_ocpp_conf:get_env(json_schema_id_prefix) ++ + emqx_config:get([gateway, ocpp, json_schema_id_prefix]) ++ binary_to_list(Action) ++ "Request"; schema_id(?OCPP_MSG_TYPE_ID_CALLRESULT, Action) when is_binary(Action) -> - emqx_ocpp_conf:get_env(json_schema_id_prefix) ++ + emqx_config:get([gateway, ocpp, json_schema_id_prefix]) ++ binary_to_list(Action) ++ "Response".