Line data Source code
1 : /*
2 : *
3 : * Copyright (c) 2021 Project CHIP Authors
4 : * All rights reserved.
5 : *
6 : * Licensed under the Apache License, Version 2.0 (the "License");
7 : * you may not use this file except in compliance with the License.
8 : * You may obtain a copy of the License at
9 : *
10 : * http://www.apache.org/licenses/LICENSE-2.0
11 : *
12 : * Unless required by applicable law or agreed to in writing, software
13 : * distributed under the License is distributed on an "AS IS" BASIS,
14 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 : * See the License for the specific language governing permissions and
16 : * limitations under the License.
17 : */
18 :
19 : /**
20 : * @file
21 : * This file defines the initiator side of a CHIP Read Interaction.
22 : *
23 : */
24 :
25 : #include <app/AppConfig.h>
26 : #include <app/InteractionModelEngine.h>
27 : #include <app/InteractionModelHelper.h>
28 : #include <app/ReadClient.h>
29 : #include <app/StatusResponse.h>
30 : #include <assert.h>
31 : #include <lib/core/TLVTypes.h>
32 : #include <lib/support/FibonacciUtils.h>
33 : #include <messaging/ReliableMessageMgr.h>
34 : #include <messaging/ReliableMessageProtocolConfig.h>
35 : #include <platform/LockTracker.h>
36 : #include <tracing/metric_event.h>
37 :
38 : #include <app-common/zap-generated/cluster-objects.h>
39 : #include <app-common/zap-generated/ids/Attributes.h>
40 : #include <app-common/zap-generated/ids/Clusters.h>
41 :
42 : namespace chip {
43 : namespace app {
44 :
45 : using Status = Protocols::InteractionModel::Status;
46 :
47 1122 : ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback,
48 1122 : InteractionType aInteractionType) :
49 1122 : mExchange(*this),
50 1122 : mpCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
51 2244 : mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
52 : {
53 1122 : assertChipStackLockedByCurrentThread();
54 :
55 1122 : mpExchangeMgr = apExchangeMgr;
56 1122 : mInteractionType = aInteractionType;
57 :
58 1122 : mpImEngine = apImEngine;
59 :
60 1122 : if (aInteractionType == InteractionType::Subscribe)
61 : {
62 317 : mpImEngine->AddReadClient(this);
63 : }
64 1122 : }
65 :
66 283 : void ReadClient::ClearActiveSubscriptionState()
67 : {
68 283 : mIsReporting = false;
69 283 : mWaitingForFirstPrimingReport = true;
70 283 : mPendingMoreChunks = false;
71 283 : mMinIntervalFloorSeconds = 0;
72 283 : mMaxInterval = 0;
73 283 : mSubscriptionId = 0;
74 283 : mIsResubscriptionScheduled = false;
75 :
76 283 : MoveToState(ClientState::Idle);
77 283 : }
78 :
79 585 : void ReadClient::StopResubscription()
80 : {
81 585 : CancelLivenessCheckTimer();
82 585 : CancelResubscribeTimer();
83 :
84 : // Only deallocate the paths if they are not already deallocated.
85 585 : if (mReadPrepareParams.mpAttributePathParamsList != nullptr || mReadPrepareParams.mpEventPathParamsList != nullptr ||
86 358 : mReadPrepareParams.mpDataVersionFilterList != nullptr)
87 : {
88 227 : mpCallback.OnDeallocatePaths(std::move(mReadPrepareParams));
89 : // Make sure we will never try to free those pointers again.
90 227 : mReadPrepareParams.mpAttributePathParamsList = nullptr;
91 227 : mReadPrepareParams.mAttributePathParamsListSize = 0;
92 227 : mReadPrepareParams.mpEventPathParamsList = nullptr;
93 227 : mReadPrepareParams.mEventPathParamsListSize = 0;
94 227 : mReadPrepareParams.mpDataVersionFilterList = nullptr;
95 227 : mReadPrepareParams.mDataVersionFilterListSize = 0;
96 : }
97 585 : }
98 :
99 1210 : ReadClient::~ReadClient()
100 : {
101 1122 : assertChipStackLockedByCurrentThread();
102 :
103 1122 : if (IsSubscriptionType())
104 : {
105 317 : StopResubscription();
106 :
107 : // Only remove ourselves from the engine's tracker list if we still continue to have a valid pointer to it.
108 : // This won't be the case if the engine shut down before this destructor was called (in which case, mpImEngine
109 : // will point to null)
110 : //
111 317 : if (mpImEngine)
112 : {
113 152 : mpImEngine->RemoveReadClient(this);
114 : }
115 : }
116 1210 : }
117 :
118 14 : uint32_t ReadClient::ComputeTimeTillNextSubscription()
119 : {
120 14 : uint32_t maxWaitTimeInMsec = 0;
121 14 : uint32_t waitTimeInMsec = 0;
122 14 : uint32_t minWaitTimeInMsec = 0;
123 :
124 14 : if (mNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX)
125 : {
126 14 : maxWaitTimeInMsec = GetFibonacciForIndex(mNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS;
127 : }
128 : else
129 : {
130 0 : maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS;
131 : }
132 :
133 14 : if (maxWaitTimeInMsec != 0)
134 : {
135 3 : minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100;
136 3 : waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec));
137 : }
138 :
139 14 : if (mMinimalResubscribeDelay.count() > waitTimeInMsec)
140 : {
141 0 : waitTimeInMsec = mMinimalResubscribeDelay.count();
142 : }
143 :
144 14 : return waitTimeInMsec;
145 : }
146 :
147 14 : CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional<SessionHandle> aNewSessionHandle,
148 : bool aReestablishCASE)
149 : {
150 14 : VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
151 :
152 : //
153 : // If we're establishing CASE, make sure we are not provided a new SessionHandle as well.
154 : //
155 14 : VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT);
156 :
157 14 : if (aNewSessionHandle.HasValue())
158 : {
159 0 : mReadPrepareParams.mSessionHolder.Grab(aNewSessionHandle.Value());
160 : }
161 :
162 14 : mForceCaseOnNextResub = aReestablishCASE;
163 14 : if (mForceCaseOnNextResub && mReadPrepareParams.mSessionHolder)
164 : {
165 : // Mark our existing session defunct, so that we will try to
166 : // re-establish it when the timer fires (unless something re-establishes
167 : // before then).
168 0 : mReadPrepareParams.mSessionHolder->AsSecureSession()->MarkAsDefunct();
169 : }
170 :
171 14 : ReturnErrorOnFailure(
172 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
173 : System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this));
174 14 : mIsResubscriptionScheduled = true;
175 :
176 14 : return CHIP_NO_ERROR;
177 : }
178 :
179 1024 : void ReadClient::Close(CHIP_ERROR aError, bool allowResubscription)
180 : {
181 1024 : if (IsReadType())
182 : {
183 741 : if (aError != CHIP_NO_ERROR)
184 : {
185 49 : mpCallback.OnError(aError);
186 : }
187 : }
188 : else
189 : {
190 283 : if (IsAwaitingInitialReport() || IsAwaitingSubscribeResponse())
191 : {
192 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, aError);
193 : }
194 :
195 283 : ClearActiveSubscriptionState();
196 283 : if (aError != CHIP_NO_ERROR)
197 : {
198 : //
199 : // We infer that re-subscription was requested by virtue of having a non-zero list of event OR attribute paths present
200 : // in mReadPrepareParams. This would only be the case if an application called SendAutoResubscribeRequest which
201 : // populates mReadPrepareParams with the values provided by the application.
202 : //
203 116 : if (allowResubscription &&
204 49 : (mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0))
205 : {
206 16 : CHIP_ERROR originalReason = aError;
207 :
208 16 : aError = mpCallback.OnResubscriptionNeeded(this, aError);
209 16 : if (aError == CHIP_NO_ERROR)
210 : {
211 16 : return;
212 : }
213 2 : if (aError == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT)
214 : {
215 2 : VerifyOrDie(originalReason == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT);
216 2 : ChipLogProgress(DataManagement, "ICD device is inactive mark subscription as InactiveICDSubscription");
217 2 : MoveToState(ClientState::InactiveICDSubscription);
218 2 : return;
219 : }
220 : }
221 :
222 : //
223 : // Either something bad happened when requesting resubscription or the application has decided to not
224 : // continue by returning an error. Let's convey the error back up to the application
225 : // and shut everything down.
226 : //
227 100 : mpCallback.OnError(aError);
228 : }
229 :
230 267 : StopResubscription();
231 : }
232 :
233 1008 : mExchange.Release();
234 :
235 1008 : mpCallback.OnDone(this);
236 : }
237 :
238 1969 : const char * ReadClient::GetStateStr() const
239 : {
240 : #if CHIP_DETAIL_LOGGING
241 1969 : switch (mState)
242 : {
243 283 : case ClientState::Idle:
244 283 : return "Idle";
245 1114 : case ClientState::AwaitingInitialReport:
246 1114 : return "AwaitingInitialReport";
247 288 : case ClientState::AwaitingSubscribeResponse:
248 288 : return "AwaitingSubscribeResponse";
249 282 : case ClientState::SubscriptionActive:
250 282 : return "SubscriptionActive";
251 2 : case ClientState::InactiveICDSubscription:
252 2 : return "InactiveICDSubscription";
253 : }
254 : #endif // CHIP_DETAIL_LOGGING
255 0 : return "N/A";
256 : }
257 :
258 1969 : void ReadClient::MoveToState(const ClientState aTargetState)
259 : {
260 1969 : mState = aTargetState;
261 1969 : ChipLogDetail(DataManagement, "%s ReadClient[%p]: Moving to [%10.10s]", __func__, this, GetStateStr());
262 1969 : }
263 :
264 869 : CHIP_ERROR ReadClient::SendRequest(ReadPrepareParams & aReadPrepareParams)
265 : {
266 869 : if (mInteractionType == InteractionType::Read)
267 : {
268 793 : return SendReadRequest(aReadPrepareParams);
269 : }
270 :
271 76 : if (mInteractionType == InteractionType::Subscribe)
272 : {
273 76 : return SendSubscribeRequest(aReadPrepareParams);
274 : }
275 :
276 0 : return CHIP_ERROR_INVALID_ARGUMENT;
277 : }
278 :
279 793 : CHIP_ERROR ReadClient::SendReadRequest(ReadPrepareParams & aReadPrepareParams)
280 : {
281 793 : CHIP_ERROR err = CHIP_NO_ERROR;
282 :
283 793 : ChipLogDetail(DataManagement, "%s ReadClient[%p]: Sending Read Request", __func__, this);
284 :
285 793 : VerifyOrReturnError(ClientState::Idle == mState, err = CHIP_ERROR_INCORRECT_STATE);
286 :
287 : Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList,
288 793 : aReadPrepareParams.mAttributePathParamsListSize);
289 793 : Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize);
290 : Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
291 793 : aReadPrepareParams.mDataVersionFilterListSize);
292 :
293 793 : System::PacketBufferHandle msgBuf;
294 793 : ReadRequestMessage::Builder request;
295 793 : System::PacketBufferTLVWriter writer;
296 :
297 793 : InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead);
298 793 : ReturnErrorOnFailure(request.Init(&writer));
299 :
300 793 : if (!attributePaths.empty())
301 : {
302 674 : AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests();
303 674 : ReturnErrorOnFailure(err = request.GetError());
304 674 : ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths));
305 : }
306 :
307 793 : if (!eventPaths.empty())
308 : {
309 317 : EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests();
310 317 : ReturnErrorOnFailure(err = request.GetError());
311 :
312 317 : ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));
313 :
314 317 : Optional<EventNumber> eventMin;
315 317 : ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
316 317 : if (eventMin.HasValue())
317 : {
318 314 : EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
319 314 : ReturnErrorOnFailure(err = request.GetError());
320 314 : ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
321 : }
322 : }
323 :
324 793 : ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
325 :
326 793 : bool encodedDataVersionList = false;
327 793 : TLV::TLVWriter backup;
328 793 : request.Checkpoint(backup);
329 793 : DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters();
330 793 : ReturnErrorOnFailure(request.GetError());
331 793 : if (!attributePaths.empty())
332 : {
333 674 : ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters,
334 : encodedDataVersionList));
335 : }
336 793 : ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead));
337 793 : if (encodedDataVersionList)
338 : {
339 80 : ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs());
340 : }
341 : else
342 : {
343 713 : request.Rollback(backup);
344 : }
345 :
346 793 : ReturnErrorOnFailure(request.EndOfReadRequestMessage());
347 793 : ReturnErrorOnFailure(writer.Finalize(&msgBuf));
348 :
349 793 : VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);
350 :
351 793 : auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
352 793 : VerifyOrReturnError(exchange != nullptr, err = CHIP_ERROR_NO_MEMORY);
353 :
354 793 : mExchange.Grab(exchange);
355 :
356 793 : if (aReadPrepareParams.mTimeout == System::Clock::kZero)
357 : {
358 793 : mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
359 : }
360 : else
361 : {
362 0 : mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
363 : }
364 :
365 793 : ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::ReadRequest, std::move(msgBuf),
366 : Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
367 :
368 793 : mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer();
369 793 : MoveToState(ClientState::AwaitingInitialReport);
370 :
371 793 : return CHIP_NO_ERROR;
372 793 : }
373 :
374 343 : CHIP_ERROR ReadClient::GenerateEventPaths(EventPathIBs::Builder & aEventPathsBuilder, const Span<EventPathParams> & aEventPaths)
375 : {
376 854 : for (auto & event : aEventPaths)
377 : {
378 511 : VerifyOrReturnError(event.IsValidEventPath(), CHIP_ERROR_IM_MALFORMED_EVENT_PATH_IB);
379 511 : EventPathIB::Builder & path = aEventPathsBuilder.CreatePath();
380 511 : ReturnErrorOnFailure(aEventPathsBuilder.GetError());
381 511 : ReturnErrorOnFailure(path.Encode(event));
382 : }
383 :
384 343 : return aEventPathsBuilder.EndOfEventPaths();
385 : }
386 :
387 984 : CHIP_ERROR ReadClient::GenerateAttributePaths(AttributePathIBs::Builder & aAttributePathIBsBuilder,
388 : const Span<AttributePathParams> & aAttributePaths)
389 : {
390 2982 : for (auto & attribute : aAttributePaths)
391 : {
392 2000 : VerifyOrReturnError(attribute.IsValidAttributePath(), CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB);
393 1998 : AttributePathIB::Builder & path = aAttributePathIBsBuilder.CreatePath();
394 1998 : ReturnErrorOnFailure(aAttributePathIBsBuilder.GetError());
395 1998 : ReturnErrorOnFailure(path.Encode(attribute));
396 : }
397 :
398 982 : return aAttributePathIBsBuilder.EndOfAttributePathIBs();
399 : }
400 :
401 975 : CHIP_ERROR ReadClient::BuildDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
402 : const Span<AttributePathParams> & aAttributePaths,
403 : const Span<DataVersionFilter> & aDataVersionFilters,
404 : bool & aEncodedDataVersionList)
405 : {
406 : #if CHIP_PROGRESS_LOGGING
407 975 : size_t encodedFilterCount = 0;
408 975 : size_t irrelevantFilterCount = 0;
409 975 : size_t skippedFilterCount = 0;
410 : #endif
411 3468 : for (auto & filter : aDataVersionFilters)
412 : {
413 2494 : VerifyOrReturnError(filter.IsValidDataVersionFilter(), CHIP_ERROR_INVALID_ARGUMENT);
414 :
415 : // If data version filter is for some cluster none of whose attributes are included in our paths, discard this filter.
416 2494 : bool intersected = false;
417 2506 : for (auto & path : aAttributePaths)
418 : {
419 2500 : if (path.IncludesAttributesInCluster(filter))
420 : {
421 2488 : intersected = true;
422 2488 : break;
423 : }
424 : }
425 :
426 2494 : if (!intersected)
427 : {
428 : #if CHIP_PROGRESS_LOGGING
429 6 : ++irrelevantFilterCount;
430 : #endif
431 6 : continue;
432 : }
433 :
434 2488 : TLV::TLVWriter backup;
435 2488 : aDataVersionFilterIBsBuilder.Checkpoint(backup);
436 2488 : CHIP_ERROR err = aDataVersionFilterIBsBuilder.EncodeDataVersionFilterIB(filter);
437 2488 : if (err == CHIP_NO_ERROR)
438 : {
439 : #if CHIP_PROGRESS_LOGGING
440 2487 : ++encodedFilterCount;
441 : #endif
442 2487 : aEncodedDataVersionList = true;
443 : }
444 1 : else if (err == CHIP_ERROR_NO_MEMORY || err == CHIP_ERROR_BUFFER_TOO_SMALL)
445 : {
446 : // Packet is full, ignore the rest of the list
447 1 : aDataVersionFilterIBsBuilder.Rollback(backup);
448 : #if CHIP_PROGRESS_LOGGING
449 1 : ssize_t nonSkippedFilterCount = &filter - aDataVersionFilters.data();
450 1 : skippedFilterCount = aDataVersionFilters.size() - static_cast<size_t>(nonSkippedFilterCount);
451 : #endif // CHIP_PROGRESS_LOGGING
452 1 : break;
453 : }
454 : else
455 : {
456 0 : return err;
457 : }
458 : }
459 :
460 975 : ChipLogProgress(DataManagement,
461 : "%lu data version filters provided, %lu not relevant, %lu encoded, %lu skipped due to lack of space",
462 : static_cast<unsigned long>(aDataVersionFilters.size()), static_cast<unsigned long>(irrelevantFilterCount),
463 : static_cast<unsigned long>(encodedFilterCount), static_cast<unsigned long>(skippedFilterCount));
464 975 : return CHIP_NO_ERROR;
465 : }
466 :
467 980 : CHIP_ERROR ReadClient::GenerateDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
468 : const Span<AttributePathParams> & aAttributePaths,
469 : const Span<DataVersionFilter> & aDataVersionFilters,
470 : bool & aEncodedDataVersionList)
471 : {
472 : // Give the callback a chance first, otherwise use the list we have, if any.
473 980 : ReturnErrorOnFailure(
474 : mpCallback.OnUpdateDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aEncodedDataVersionList));
475 :
476 980 : if (!aEncodedDataVersionList)
477 : {
478 975 : ReturnErrorOnFailure(BuildDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aDataVersionFilters,
479 : aEncodedDataVersionList));
480 : }
481 :
482 980 : return CHIP_NO_ERROR;
483 : }
484 :
485 3 : void ReadClient::OnActiveModeNotification()
486 : {
487 3 : VerifyOrDie(mpImEngine->InActiveReadClientList(this));
488 :
489 : // Note: this API only works when issuing subscription via SendAutoResubscribeRequest. When SendAutoResubscribeRequest is
490 : // called, either mEventPathParamsListSize or mAttributePathParamsListSize is not 0.
491 3 : VerifyOrReturn(mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0);
492 :
493 : // When we reach here, the subscription definitely exceeded the liveness timeout. Just continue the unfinished resubscription
494 : // logic in `OnLivenessTimeoutCallback`.
495 3 : if (IsInactiveICDSubscription())
496 : {
497 1 : TriggerResubscriptionForLivenessTimeout(CHIP_ERROR_TIMEOUT);
498 1 : return;
499 : }
500 :
501 : // If this API has been called, that means the subscription for this ReadClient is gone
502 : // on the server side (because otherwise the server would not have checked in with us).
503 : // Even if we think we have a live subscription, we are wrong, and should just forcibly time it
504 : // out and schedule a new one.
505 2 : if (!mIsResubscriptionScheduled)
506 : {
507 : // Closing will ultimately trigger ScheduleResubscription with the aReestablishCASE argument set to true, effectively
508 : // rendering the session defunct.
509 1 : Close(CHIP_ERROR_TIMEOUT);
510 1 : return;
511 : }
512 :
513 : // If we have already detected subscription loss and are waiting to try to re-subscribe,
514 : // now is a really good time to do it, since the server is listening.
515 1 : TriggerResubscribeIfScheduled("check-in message");
516 : }
517 :
518 3 : void ReadClient::OnPeerTypeChange(PeerType aType)
519 : {
520 3 : VerifyOrDie(mpImEngine->InActiveReadClientList(this));
521 :
522 3 : mIsPeerLIT = (aType == PeerType::kLITICD);
523 :
524 3 : ChipLogProgress(DataManagement, "Peer is now %s LIT ICD.", mIsPeerLIT ? "a" : "not a");
525 :
526 : // If the peer is no longer LIT and we were waiting for a check-in to try to resubscribe,
527 : // just try to resubscribe now, because a SIT is not going to send a check-in.
528 3 : if (!mIsPeerLIT && IsInactiveICDSubscription())
529 : {
530 0 : TriggerResubscriptionForLivenessTimeout(CHIP_ERROR_TIMEOUT);
531 : }
532 3 : }
533 :
534 2190 : CHIP_ERROR ReadClient::OnMessageReceived(Messaging::ExchangeContext * apExchangeContext, const PayloadHeader & aPayloadHeader,
535 : System::PacketBufferHandle && aPayload)
536 : {
537 2190 : CHIP_ERROR err = CHIP_NO_ERROR;
538 2190 : Status status = Status::InvalidAction;
539 2190 : VerifyOrExit(!IsIdle() && !IsInactiveICDSubscription(), err = CHIP_ERROR_INCORRECT_STATE);
540 :
541 2190 : if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::ReportData))
542 : {
543 1836 : err = ProcessReportData(std::move(aPayload), ReportType::kContinuingTransaction);
544 : }
545 354 : else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::SubscribeResponse))
546 : {
547 284 : ChipLogProgress(DataManagement, "SubscribeResponse is received");
548 284 : VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
549 284 : err = ProcessSubscribeResponse(std::move(aPayload));
550 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, err);
551 : }
552 70 : else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::StatusResponse))
553 : {
554 138 : VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
555 70 : CHIP_ERROR statusError = CHIP_NO_ERROR;
556 70 : SuccessOrExit(err = StatusResponse::ProcessStatusResponse(std::move(aPayload), statusError));
557 70 : SuccessOrExit(err = statusError);
558 2 : err = CHIP_ERROR_INVALID_MESSAGE_TYPE;
559 : }
560 : else
561 : {
562 0 : err = CHIP_ERROR_INVALID_MESSAGE_TYPE;
563 : }
564 :
565 2190 : exit:
566 2190 : if (err != CHIP_NO_ERROR)
567 : {
568 76 : if (err == CHIP_ERROR_INVALID_SUBSCRIPTION)
569 : {
570 4 : status = Status::InvalidSubscription;
571 : }
572 76 : StatusResponse::Send(status, apExchangeContext, false /*aExpectResponse*/);
573 : }
574 :
575 2190 : if ((!IsSubscriptionType() && !mPendingMoreChunks) || err != CHIP_NO_ERROR)
576 : {
577 768 : Close(err);
578 : }
579 :
580 2190 : return err;
581 : }
582 :
583 82 : void ReadClient::OnUnsolicitedReportData(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload)
584 : {
585 82 : Status status = Status::Success;
586 82 : mExchange.Grab(apExchangeContext);
587 :
588 : //
589 : // Let's update the session we're tracking in our SessionHolder to that associated with the message that was just received.
590 : // This CAN be different from the one we were tracking before, since the server is permitted to send exchanges on any valid
591 : // session to us, of which there could be multiple.
592 : //
593 : // Since receipt of a message is proof of a working session on the peer, it's always best to update to that if possible
594 : // to maximize our chances of success later.
595 : //
596 82 : mReadPrepareParams.mSessionHolder.Grab(mExchange->GetSessionHandle());
597 :
598 82 : CHIP_ERROR err = ProcessReportData(std::move(aPayload), ReportType::kUnsolicited);
599 82 : if (err != CHIP_NO_ERROR)
600 : {
601 0 : if (err == CHIP_ERROR_INVALID_SUBSCRIPTION)
602 : {
603 0 : status = Status::InvalidSubscription;
604 : }
605 : else
606 : {
607 0 : status = Status::InvalidAction;
608 : }
609 :
610 0 : StatusResponse::Send(status, mExchange.Get(), false /*aExpectResponse*/);
611 0 : Close(err);
612 : }
613 82 : }
614 :
615 1926 : CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload, ReportType aReportType)
616 : {
617 1926 : CHIP_ERROR err = CHIP_NO_ERROR;
618 1926 : ReportDataMessage::Parser report;
619 1926 : bool suppressResponse = true;
620 1926 : SubscriptionId subscriptionId = 0;
621 1926 : EventReportIBs::Parser eventReportIBs;
622 1926 : AttributeReportIBs::Parser attributeReportIBs;
623 1926 : System::PacketBufferTLVReader reader;
624 1926 : reader.Init(std::move(aPayload));
625 1926 : err = report.Init(reader);
626 1926 : SuccessOrExit(err);
627 :
628 : #if CHIP_CONFIG_IM_PRETTY_PRINT
629 1924 : if (aReportType != ReportType::kUnsolicited)
630 : {
631 1842 : report.PrettyPrint();
632 : }
633 : #endif
634 :
635 1924 : err = report.GetSuppressResponse(&suppressResponse);
636 1924 : if (CHIP_END_OF_TLV == err)
637 : {
638 1224 : suppressResponse = false;
639 1224 : err = CHIP_NO_ERROR;
640 : }
641 1924 : SuccessOrExit(err);
642 :
643 1924 : err = report.GetSubscriptionId(&subscriptionId);
644 1924 : if (CHIP_NO_ERROR == err)
645 : {
646 420 : VerifyOrExit(IsSubscriptionType(), err = CHIP_ERROR_INVALID_ARGUMENT);
647 418 : if (mWaitingForFirstPrimingReport)
648 : {
649 286 : mSubscriptionId = subscriptionId;
650 : }
651 132 : else if (!IsMatchingSubscriptionId(subscriptionId))
652 : {
653 2 : err = CHIP_ERROR_INVALID_SUBSCRIPTION;
654 : }
655 : }
656 1504 : else if (CHIP_END_OF_TLV == err)
657 : {
658 1504 : if (IsSubscriptionType())
659 : {
660 0 : err = CHIP_ERROR_INVALID_ARGUMENT;
661 : }
662 : else
663 : {
664 1504 : err = CHIP_NO_ERROR;
665 : }
666 : }
667 1922 : SuccessOrExit(err);
668 :
669 1920 : err = report.GetMoreChunkedMessages(&mPendingMoreChunks);
670 1920 : if (CHIP_END_OF_TLV == err)
671 : {
672 1054 : mPendingMoreChunks = false;
673 1054 : err = CHIP_NO_ERROR;
674 : }
675 1920 : SuccessOrExit(err);
676 :
677 1920 : err = report.GetEventReports(&eventReportIBs);
678 1920 : if (err == CHIP_END_OF_TLV)
679 : {
680 1280 : err = CHIP_NO_ERROR;
681 : }
682 640 : else if (err == CHIP_NO_ERROR)
683 : {
684 640 : chip::TLV::TLVReader EventReportsReader;
685 640 : eventReportIBs.GetReader(&EventReportsReader);
686 640 : err = ProcessEventReportIBs(EventReportsReader);
687 : }
688 1920 : SuccessOrExit(err);
689 :
690 1920 : err = report.GetAttributeReportIBs(&attributeReportIBs);
691 1920 : if (err == CHIP_END_OF_TLV)
692 : {
693 631 : err = CHIP_NO_ERROR;
694 : }
695 1289 : else if (err == CHIP_NO_ERROR)
696 : {
697 1289 : TLV::TLVReader attributeReportIBsReader;
698 1289 : attributeReportIBs.GetReader(&attributeReportIBsReader);
699 1289 : err = ProcessAttributeReportIBs(attributeReportIBsReader);
700 : }
701 1920 : SuccessOrExit(err);
702 :
703 1918 : if (mIsReporting && !mPendingMoreChunks)
704 : {
705 1023 : mpCallback.OnReportEnd();
706 1023 : mIsReporting = false;
707 : }
708 :
709 1918 : SuccessOrExit(err = report.ExitContainer());
710 :
711 1926 : exit:
712 1926 : if (IsSubscriptionType())
713 : {
714 420 : if (IsAwaitingInitialReport())
715 : {
716 288 : MoveToState(ClientState::AwaitingSubscribeResponse);
717 : }
718 132 : else if (IsSubscriptionActive() && err == CHIP_NO_ERROR)
719 : {
720 : //
721 : // Only refresh the liveness check timer if we've successfully established
722 : // a subscription and have a valid value for mMaxInterval which the function
723 : // relies on.
724 : //
725 103 : mpCallback.NotifySubscriptionStillActive(*this);
726 103 : err = RefreshLivenessCheckTimer();
727 : }
728 : }
729 :
730 1926 : if (!suppressResponse && err == CHIP_NO_ERROR)
731 : {
732 1222 : bool noResponseExpected = IsSubscriptionActive() && !mPendingMoreChunks;
733 1222 : err = StatusResponse::Send(Status::Success, mExchange.Get(), !noResponseExpected);
734 : }
735 :
736 1926 : mWaitingForFirstPrimingReport = false;
737 3852 : return err;
738 1926 : }
739 :
740 12 : void ReadClient::OnResponseTimeout(Messaging::ExchangeContext * apExchangeContext)
741 : {
742 12 : ChipLogError(DataManagement, "Time out! failed to receive report data from Exchange: " ChipLogFormatExchange,
743 : ChipLogValueExchange(apExchangeContext));
744 :
745 12 : Close(CHIP_ERROR_TIMEOUT);
746 12 : }
747 :
748 5 : CHIP_ERROR ReadClient::ReadICDOperatingModeFromAttributeDataIB(TLV::TLVReader && aReader, PeerType & aType)
749 : {
750 : Clusters::IcdManagement::Attributes::OperatingMode::TypeInfo::DecodableType operatingMode;
751 :
752 5 : CHIP_ERROR err = DataModel::Decode(aReader, operatingMode);
753 5 : ReturnErrorOnFailure(err);
754 :
755 5 : switch (operatingMode)
756 : {
757 4 : case Clusters::IcdManagement::OperatingModeEnum::kSit:
758 4 : aType = PeerType::kNormal;
759 4 : break;
760 1 : case Clusters::IcdManagement::OperatingModeEnum::kLit:
761 1 : aType = PeerType::kLITICD;
762 1 : break;
763 0 : default:
764 0 : err = CHIP_ERROR_INVALID_ARGUMENT;
765 0 : break;
766 : }
767 :
768 5 : return err;
769 : }
770 :
771 5218 : CHIP_ERROR ReadClient::ProcessAttributePath(AttributePathIB::Parser & aAttributePathParser,
772 : ConcreteDataAttributePath & aAttributePath)
773 : {
774 : // The ReportData must contain a concrete attribute path. Don't validate ID
775 : // ranges here, so we can tell apart "malformed data" and "out of range
776 : // IDs".
777 5218 : CHIP_ERROR err = CHIP_NO_ERROR;
778 : // The ReportData must contain a concrete attribute path
779 5218 : err = aAttributePathParser.GetConcreteAttributePath(aAttributePath, AttributePathIB::ValidateIdRanges::kNo);
780 5218 : VerifyOrReturnError(err == CHIP_NO_ERROR, CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB);
781 5216 : return CHIP_NO_ERROR;
782 : }
783 :
784 6791 : void ReadClient::NoteReportingData()
785 : {
786 6791 : if (!mIsReporting)
787 : {
788 1077 : mpCallback.OnReportBegin();
789 1077 : mIsReporting = true;
790 : }
791 6791 : }
792 :
793 1289 : CHIP_ERROR ReadClient::ProcessAttributeReportIBs(TLV::TLVReader & aAttributeReportIBsReader)
794 : {
795 1289 : CHIP_ERROR err = CHIP_NO_ERROR;
796 6505 : while (CHIP_NO_ERROR == (err = aAttributeReportIBsReader.Next()))
797 : {
798 5218 : TLV::TLVReader dataReader;
799 5218 : AttributeReportIB::Parser report;
800 5218 : AttributeDataIB::Parser data;
801 5218 : AttributeStatusIB::Parser status;
802 5218 : AttributePathIB::Parser path;
803 5218 : ConcreteDataAttributePath attributePath;
804 5218 : StatusIB statusIB;
805 :
806 5218 : TLV::TLVReader reader = aAttributeReportIBsReader;
807 5218 : ReturnErrorOnFailure(report.Init(reader));
808 :
809 5218 : err = report.GetAttributeStatus(&status);
810 5218 : if (CHIP_NO_ERROR == err)
811 : {
812 9 : StatusIB::Parser errorStatus;
813 9 : ReturnErrorOnFailure(status.GetPath(&path));
814 9 : ReturnErrorOnFailure(ProcessAttributePath(path, attributePath));
815 9 : if (!attributePath.IsValid())
816 : {
817 : // Don't fail the entire read or subscription when there is an
818 : // out-of-range ID. Just skip that one AttributeReportIB.
819 0 : ChipLogError(DataManagement,
820 : "Skipping AttributeStatusIB with out-of-range IDs: (%d, " ChipLogFormatMEI ", " ChipLogFormatMEI ") ",
821 : attributePath.mEndpointId, ChipLogValueMEI(attributePath.mClusterId),
822 : ChipLogValueMEI(attributePath.mAttributeId));
823 0 : continue;
824 : }
825 :
826 9 : ReturnErrorOnFailure(status.GetErrorStatus(&errorStatus));
827 9 : ReturnErrorOnFailure(errorStatus.DecodeStatusIB(statusIB));
828 9 : NoteReportingData();
829 9 : mpCallback.OnAttributeData(attributePath, nullptr, statusIB);
830 : }
831 5209 : else if (CHIP_END_OF_TLV == err)
832 : {
833 5209 : ReturnErrorOnFailure(report.GetAttributeData(&data));
834 5209 : ReturnErrorOnFailure(data.GetPath(&path));
835 5209 : ReturnErrorOnFailure(ProcessAttributePath(path, attributePath));
836 5207 : if (!attributePath.IsValid())
837 : {
838 : // Don't fail the entire read or subscription when there is an
839 : // out-of-range ID. Just skip that one AttributeReportIB.
840 2 : ChipLogError(DataManagement,
841 : "Skipping AttributeDataIB with out-of-range IDs: (%d, " ChipLogFormatMEI ", " ChipLogFormatMEI ") ",
842 : attributePath.mEndpointId, ChipLogValueMEI(attributePath.mClusterId),
843 : ChipLogValueMEI(attributePath.mAttributeId));
844 2 : continue;
845 : }
846 :
847 5205 : DataVersion version = 0;
848 5205 : ReturnErrorOnFailure(data.GetDataVersion(&version));
849 5205 : attributePath.mDataVersion.SetValue(version);
850 :
851 5205 : if (mReadPrepareParams.mpDataVersionFilterList != nullptr)
852 : {
853 65 : UpdateDataVersionFilters(attributePath);
854 : }
855 :
856 5205 : ReturnErrorOnFailure(data.GetData(&dataReader));
857 :
858 : // The element in an array may be another array -- so we should only set the list operation when we are handling the
859 : // whole list.
860 5205 : if (!attributePath.IsListOperation() && dataReader.GetType() == TLV::kTLVType_Array)
861 : {
862 2975 : attributePath.mListOp = ConcreteDataAttributePath::ListOperation::ReplaceAll;
863 : }
864 :
865 5205 : if (attributePath.MatchesConcreteAttributePath(ConcreteAttributePath(
866 : kRootEndpointId, Clusters::IcdManagement::Id, Clusters::IcdManagement::Attributes::OperatingMode::Id)))
867 : {
868 : PeerType peerType;
869 5 : TLV::TLVReader operatingModeTlvReader;
870 5 : operatingModeTlvReader.Init(dataReader);
871 5 : if (CHIP_NO_ERROR == ReadICDOperatingModeFromAttributeDataIB(std::move(operatingModeTlvReader), peerType))
872 : {
873 : // It is safe to call `OnPeerTypeChange` since we are in the middle of parsing the attribute data, And
874 : // the subscription should be active so `OnActiveModeNotification` is a no-op in this case.
875 5 : InteractionModelEngine::GetInstance()->OnPeerTypeChange(mPeer, peerType);
876 : }
877 : else
878 : {
879 0 : ChipLogError(DataManagement, "Failed to get ICD state from attribute data with error'%" CHIP_ERROR_FORMAT "'",
880 : err.Format());
881 : }
882 : }
883 :
884 5205 : NoteReportingData();
885 5205 : mpCallback.OnAttributeData(attributePath, &dataReader, statusIB);
886 : }
887 : }
888 :
889 1287 : if (CHIP_END_OF_TLV == err)
890 : {
891 1287 : err = CHIP_NO_ERROR;
892 : }
893 :
894 1287 : return err;
895 : }
896 :
897 640 : CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsReader)
898 : {
899 640 : CHIP_ERROR err = CHIP_NO_ERROR;
900 2217 : while (CHIP_NO_ERROR == (err = aEventReportIBsReader.Next()))
901 : {
902 1577 : TLV::TLVReader dataReader;
903 1577 : EventReportIB::Parser report;
904 1577 : EventDataIB::Parser data;
905 1577 : EventHeader header;
906 1577 : StatusIB statusIB; // Default value for statusIB is success.
907 :
908 1577 : TLV::TLVReader reader = aEventReportIBsReader;
909 1577 : ReturnErrorOnFailure(report.Init(reader));
910 :
911 1577 : err = report.GetEventData(&data);
912 :
913 1577 : if (err == CHIP_NO_ERROR)
914 : {
915 1575 : header.mTimestamp = mEventTimestamp;
916 1575 : ReturnErrorOnFailure(data.DecodeEventHeader(header));
917 1575 : mEventTimestamp = header.mTimestamp;
918 :
919 1575 : ReturnErrorOnFailure(data.GetData(&dataReader));
920 :
921 : //
922 : // Update the event number being tracked in mReadPrepareParams in case
923 : // we want to send it in the next SubscribeRequest message to convey
924 : // the event number for which we have already received an event.
925 : //
926 1575 : mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1);
927 :
928 1575 : NoteReportingData();
929 1575 : mpCallback.OnEventData(header, &dataReader, nullptr);
930 : }
931 2 : else if (err == CHIP_END_OF_TLV)
932 : {
933 2 : EventStatusIB::Parser status;
934 2 : EventPathIB::Parser pathIB;
935 2 : StatusIB::Parser statusIBParser;
936 2 : ReturnErrorOnFailure(report.GetEventStatus(&status));
937 2 : ReturnErrorOnFailure(status.GetPath(&pathIB));
938 2 : ReturnErrorOnFailure(pathIB.GetEventPath(&header.mPath));
939 2 : ReturnErrorOnFailure(status.GetErrorStatus(&statusIBParser));
940 2 : ReturnErrorOnFailure(statusIBParser.DecodeStatusIB(statusIB));
941 :
942 2 : NoteReportingData();
943 2 : mpCallback.OnEventData(header, nullptr, &statusIB);
944 : }
945 : }
946 :
947 640 : if (CHIP_END_OF_TLV == err)
948 : {
949 640 : err = CHIP_NO_ERROR;
950 : }
951 :
952 640 : return err;
953 : }
954 :
955 0 : void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout)
956 : {
957 0 : mLivenessTimeoutOverride = aLivenessTimeout;
958 0 : auto err = RefreshLivenessCheckTimer();
959 0 : if (err != CHIP_NO_ERROR)
960 : {
961 0 : Close(err);
962 : }
963 0 : }
964 :
965 385 : CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
966 : {
967 385 : CHIP_ERROR err = CHIP_NO_ERROR;
968 :
969 385 : VerifyOrReturnError(IsSubscriptionActive(), CHIP_ERROR_INCORRECT_STATE);
970 :
971 385 : CancelLivenessCheckTimer();
972 :
973 : System::Clock::Timeout timeout;
974 385 : ReturnErrorOnFailure(ComputeLivenessCheckTimerTimeout(&timeout));
975 :
976 : // EFR32/MBED/INFINION/K32W's chrono count return long unsigned, but other platform returns unsigned
977 385 : ChipLogProgress(
978 : DataManagement,
979 : "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 " Peer = %02x:" ChipLogFormatX64,
980 : static_cast<long unsigned>(timeout.count()), mSubscriptionId, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()));
981 385 : err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
982 : timeout, OnLivenessTimeoutCallback, this);
983 :
984 385 : return err;
985 : }
986 :
987 385 : CHIP_ERROR ReadClient::ComputeLivenessCheckTimerTimeout(System::Clock::Timeout * aTimeout)
988 : {
989 385 : if (mLivenessTimeoutOverride != System::Clock::kZero)
990 : {
991 0 : *aTimeout = mLivenessTimeoutOverride;
992 0 : return CHIP_NO_ERROR;
993 : }
994 :
995 385 : VerifyOrReturnError(mReadPrepareParams.mSessionHolder, CHIP_ERROR_INCORRECT_STATE);
996 :
997 : //
998 : // To calculate the duration we're willing to wait for a report to come to us, we take into account the maximum interval of
999 : // the subscription AND the time it takes for the report to make it to us in the worst case.
1000 : //
1001 : // We have no way to estimate what the network latency will be, but we do know the other side will time out its ReportData
1002 : // after its computed round-trip timeout plus the processing time it gives us (app::kExpectedIMProcessingTime). Once it
1003 : // times out, assuming it sent the report at all, there's no point in us thinking we still have a subscription.
1004 : //
1005 : // We can't use ComputeRoundTripTimeout() on the session for two reasons: we want the roundtrip timeout from the point of
1006 : // view of the peer, not us, and we want to start off with the assumption the peer will likely have, which is that we are
1007 : // idle, whereas ComputeRoundTripTimeout() uses the current activity state of the peer.
1008 : //
1009 : // So recompute the round-trip timeout directly. Assume MRP, since in practice that is likely what is happening.
1010 385 : auto & peerMRPConfig = mReadPrepareParams.mSessionHolder->GetRemoteMRPConfig();
1011 : // Peer will assume we are idle (hence we pass kZero to GetMessageReceiptTimeout()), but will assume we treat it as active
1012 : // for the response, so to match the retransmission timeout computation for the message back to the peer, we should treat
1013 : // it as active and handling non-initial message, isFirstMessageOnExchange needs to be set as false for
1014 : // GetRetransmissionTimeout.
1015 : auto roundTripTimeout =
1016 385 : mReadPrepareParams.mSessionHolder->GetMessageReceiptTimeout(System::Clock::kZero, true /*isFirstMessageOnExchange*/) +
1017 385 : kExpectedIMProcessingTime +
1018 385 : GetRetransmissionTimeout(peerMRPConfig.mActiveRetransTimeout, peerMRPConfig.mIdleRetransTimeout,
1019 385 : System::SystemClock().GetMonotonicTimestamp(), peerMRPConfig.mActiveThresholdTime,
1020 385 : false /*isFirstMessageOnExchange*/);
1021 385 : *aTimeout = System::Clock::Seconds16(mMaxInterval) + roundTripTimeout;
1022 385 : return CHIP_NO_ERROR;
1023 : }
1024 :
1025 970 : void ReadClient::CancelLivenessCheckTimer()
1026 : {
1027 970 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
1028 : OnLivenessTimeoutCallback, this);
1029 970 : }
1030 :
1031 586 : void ReadClient::CancelResubscribeTimer()
1032 : {
1033 586 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
1034 : OnResubscribeTimerCallback, this);
1035 586 : mIsResubscriptionScheduled = false;
1036 586 : }
1037 :
1038 8 : void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState)
1039 : {
1040 8 : ReadClient * const _this = reinterpret_cast<ReadClient *>(apAppState);
1041 :
1042 : // TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e
1043 : // response timeouts).
1044 8 : CHIP_ERROR subscriptionTerminationCause = CHIP_ERROR_TIMEOUT;
1045 :
1046 : //
1047 : // Might as well try to see if this instance exists in the tracked list in the IM.
1048 : // This might blow-up if either the client has since been free'ed (use-after-free), or if the engine has since
1049 : // been shutdown at which point the client wouldn't exist in the active read client list.
1050 : //
1051 8 : VerifyOrDie(_this->mpImEngine->InActiveReadClientList(_this));
1052 :
1053 8 : ChipLogError(DataManagement,
1054 : "Subscription Liveness timeout with SubscriptionID = 0x%08" PRIx32 ", Peer = %02x:" ChipLogFormatX64,
1055 : _this->mSubscriptionId, _this->GetFabricIndex(), ChipLogValueX64(_this->GetPeerNodeId()));
1056 :
1057 : // If subscription client is able to handle check-in messages and peer operation mode is LIT,
1058 : // use CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT as subscriptionTerminationCause.
1059 : // This will cause us to wait for a check-in message before trying to re-subscribe, instead of trying
1060 : // (and probably failing, because we are dealing with a LIT ICD) off a timer.
1061 8 : if (_this->mIsPeerLIT && _this->mReadPrepareParams.mRegisteredCheckInToken)
1062 : {
1063 3 : subscriptionTerminationCause = CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT;
1064 : }
1065 :
1066 8 : _this->TriggerResubscriptionForLivenessTimeout(subscriptionTerminationCause);
1067 8 : }
1068 :
1069 9 : void ReadClient::TriggerResubscriptionForLivenessTimeout(CHIP_ERROR aReason)
1070 : {
1071 : // We didn't get a message from the server on time; it's possible that it no
1072 : // longer has a useful CASE session to us. Mark defunct all sessions that
1073 : // have not seen peer activity in at least as long as our session.
1074 9 : const auto & holder = mReadPrepareParams.mSessionHolder;
1075 9 : if (holder)
1076 : {
1077 9 : System::Clock::Timestamp lastPeerActivity = holder->AsSecureSession()->GetLastPeerActivityTime();
1078 9 : mpImEngine->GetExchangeManager()->GetSessionManager()->ForEachMatchingSession(mPeer, [&lastPeerActivity](auto * session) {
1079 9 : if (!session->IsCASESession())
1080 : {
1081 9 : return;
1082 : }
1083 :
1084 0 : if (session->GetLastPeerActivityTime() > lastPeerActivity)
1085 : {
1086 0 : return;
1087 : }
1088 :
1089 0 : session->MarkAsDefunct();
1090 : });
1091 : }
1092 :
1093 9 : Close(aReason);
1094 9 : }
1095 :
1096 284 : CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aPayload)
1097 : {
1098 284 : System::PacketBufferTLVReader reader;
1099 284 : reader.Init(std::move(aPayload));
1100 :
1101 284 : SubscribeResponseMessage::Parser subscribeResponse;
1102 284 : ReturnErrorOnFailure(subscribeResponse.Init(reader));
1103 :
1104 : #if CHIP_CONFIG_IM_PRETTY_PRINT
1105 284 : subscribeResponse.PrettyPrint();
1106 : #endif
1107 :
1108 284 : SubscriptionId subscriptionId = 0;
1109 284 : VerifyOrReturnError(subscribeResponse.GetSubscriptionId(&subscriptionId) == CHIP_NO_ERROR, CHIP_ERROR_INVALID_ARGUMENT);
1110 284 : VerifyOrReturnError(IsMatchingSubscriptionId(subscriptionId), CHIP_ERROR_INVALID_SUBSCRIPTION);
1111 282 : ReturnErrorOnFailure(subscribeResponse.GetMaxInterval(&mMaxInterval));
1112 :
1113 : #if CHIP_PROGRESS_LOGGING
1114 282 : auto duration = System::Clock::Milliseconds32(System::SystemClock().GetMonotonicTimestamp() - mSubscribeRequestTime);
1115 : #endif
1116 282 : ChipLogProgress(DataManagement,
1117 : "Subscription established in %" PRIu32 "ms with SubscriptionID = 0x%08" PRIx32 " MinInterval = %u"
1118 : "s MaxInterval = %us Peer = %02x:" ChipLogFormatX64,
1119 : duration.count(), mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, GetFabricIndex(),
1120 : ChipLogValueX64(GetPeerNodeId()));
1121 :
1122 282 : ReturnErrorOnFailure(subscribeResponse.ExitContainer());
1123 :
1124 282 : MoveToState(ClientState::SubscriptionActive);
1125 :
1126 282 : mpCallback.OnSubscriptionEstablished(subscriptionId);
1127 :
1128 282 : mNumRetries = 0;
1129 :
1130 282 : ReturnErrorOnFailure(RefreshLivenessCheckTimer());
1131 :
1132 282 : return CHIP_NO_ERROR;
1133 284 : }
1134 :
1135 227 : CHIP_ERROR ReadClient::SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams)
1136 : {
1137 : // Make sure we don't use minimal resubscribe delays from previous attempts
1138 : // for this one.
1139 227 : mMinimalResubscribeDelay = System::Clock::kZero;
1140 :
1141 227 : mReadPrepareParams = std::move(aReadPrepareParams);
1142 227 : CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams);
1143 227 : if (err != CHIP_NO_ERROR)
1144 : {
1145 1 : StopResubscription();
1146 : }
1147 227 : return err;
1148 : }
1149 :
1150 0 : CHIP_ERROR ReadClient::SendAutoResubscribeRequest(const ScopedNodeId & aPublisherId, ReadPrepareParams && aReadPrepareParams)
1151 : {
1152 0 : mPeer = aPublisherId;
1153 0 : mReadPrepareParams = std::move(aReadPrepareParams);
1154 0 : CHIP_ERROR err = EstablishSessionToPeer();
1155 0 : if (err != CHIP_NO_ERROR)
1156 : {
1157 : // Make sure we call our callback's OnDeallocatePaths.
1158 0 : StopResubscription();
1159 : }
1160 0 : return err;
1161 : }
1162 :
1163 316 : CHIP_ERROR ReadClient::SendSubscribeRequest(const ReadPrepareParams & aReadPrepareParams)
1164 : {
1165 316 : VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds,
1166 : CHIP_ERROR_INVALID_ARGUMENT);
1167 :
1168 313 : auto err = SendSubscribeRequestImpl(aReadPrepareParams);
1169 313 : if (CHIP_NO_ERROR != err)
1170 : {
1171 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, err);
1172 : }
1173 313 : return err;
1174 : }
1175 :
1176 313 : CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadPrepareParams)
1177 : {
1178 : MATTER_LOG_METRIC_BEGIN(Tracing::kMetricDeviceSubscriptionSetup);
1179 :
1180 : #if CHIP_PROGRESS_LOGGING
1181 313 : mSubscribeRequestTime = System::SystemClock().GetMonotonicTimestamp();
1182 : #endif
1183 :
1184 313 : VerifyOrReturnError(ClientState::Idle == mState, CHIP_ERROR_INCORRECT_STATE);
1185 :
1186 313 : if (&aReadPrepareParams != &mReadPrepareParams)
1187 : {
1188 76 : mReadPrepareParams.mSessionHolder = aReadPrepareParams.mSessionHolder;
1189 : }
1190 :
1191 313 : mIsPeerLIT = aReadPrepareParams.mIsPeerLIT;
1192 313 : mReadPrepareParams.mRegisteredCheckInToken = aReadPrepareParams.mRegisteredCheckInToken;
1193 :
1194 313 : if (aReadPrepareParams.mRegisteredCheckInToken)
1195 : {
1196 11 : ChipLogProgress(DataManagement, "ICD Check-In token has been registered in peer device " ChipLogFormatScopedNodeId,
1197 : ChipLogValueScopedNodeId(mPeer));
1198 : }
1199 :
1200 313 : mMinIntervalFloorSeconds = aReadPrepareParams.mMinIntervalFloorSeconds;
1201 :
1202 : // Todo: Remove the below, Update span in ReadPrepareParams
1203 313 : Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList,
1204 313 : aReadPrepareParams.mAttributePathParamsListSize);
1205 313 : Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize);
1206 313 : Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
1207 313 : aReadPrepareParams.mDataVersionFilterListSize);
1208 :
1209 313 : System::PacketBufferHandle msgBuf;
1210 313 : System::PacketBufferTLVWriter writer;
1211 313 : SubscribeRequestMessage::Builder request;
1212 313 : InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead);
1213 :
1214 313 : ReturnErrorOnFailure(request.Init(&writer));
1215 :
1216 313 : request.KeepSubscriptions(aReadPrepareParams.mKeepSubscriptions)
1217 313 : .MinIntervalFloorSeconds(aReadPrepareParams.mMinIntervalFloorSeconds)
1218 313 : .MaxIntervalCeilingSeconds(aReadPrepareParams.mMaxIntervalCeilingSeconds);
1219 :
1220 313 : if (!attributePaths.empty())
1221 : {
1222 306 : AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests();
1223 306 : ReturnErrorOnFailure(attributePathListBuilder.GetError());
1224 306 : ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths));
1225 : }
1226 :
1227 313 : if (!eventPaths.empty())
1228 : {
1229 22 : EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests();
1230 22 : ReturnErrorOnFailure(eventPathListBuilder.GetError());
1231 22 : ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));
1232 :
1233 22 : Optional<EventNumber> eventMin;
1234 22 : ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
1235 22 : if (eventMin.HasValue())
1236 : {
1237 0 : EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
1238 0 : ReturnErrorOnFailure(request.GetError());
1239 0 : ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
1240 : }
1241 : }
1242 :
1243 313 : ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
1244 :
1245 313 : bool encodedDataVersionList = false;
1246 313 : TLV::TLVWriter backup;
1247 313 : request.Checkpoint(backup);
1248 313 : DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters();
1249 313 : ReturnErrorOnFailure(request.GetError());
1250 313 : if (!attributePaths.empty())
1251 : {
1252 306 : ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters,
1253 : encodedDataVersionList));
1254 : }
1255 313 : ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead));
1256 313 : if (encodedDataVersionList)
1257 : {
1258 65 : ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs());
1259 : }
1260 : else
1261 : {
1262 248 : request.Rollback(backup);
1263 : }
1264 :
1265 313 : ReturnErrorOnFailure(request.EndOfSubscribeRequestMessage());
1266 313 : ReturnErrorOnFailure(writer.Finalize(&msgBuf));
1267 :
1268 313 : VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);
1269 :
1270 313 : auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
1271 313 : if (exchange == nullptr)
1272 : {
1273 0 : if (aReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession())
1274 : {
1275 0 : return CHIP_ERROR_NO_MEMORY;
1276 : }
1277 :
1278 : // Trying to subscribe with a defunct session somehow.
1279 0 : return CHIP_ERROR_INCORRECT_STATE;
1280 : }
1281 :
1282 313 : mExchange.Grab(exchange);
1283 :
1284 313 : if (aReadPrepareParams.mTimeout == System::Clock::kZero)
1285 : {
1286 313 : mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
1287 : }
1288 : else
1289 : {
1290 0 : mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
1291 : }
1292 :
1293 313 : ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf),
1294 : Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
1295 :
1296 313 : mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer();
1297 :
1298 313 : MoveToState(ClientState::AwaitingInitialReport);
1299 :
1300 313 : return CHIP_NO_ERROR;
1301 313 : }
1302 :
1303 6 : CHIP_ERROR ReadClient::DefaultResubscribePolicy(CHIP_ERROR aTerminationCause)
1304 : {
1305 6 : if (aTerminationCause == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT)
1306 : {
1307 0 : ChipLogProgress(DataManagement, "ICD device is inactive, skipping scheduling resubscribe within DefaultResubscribePolicy");
1308 0 : return CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT;
1309 : }
1310 :
1311 6 : VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
1312 :
1313 6 : auto timeTillNextResubscription = ComputeTimeTillNextSubscription();
1314 6 : ChipLogProgress(DataManagement,
1315 : "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32
1316 : "ms due to error %" CHIP_ERROR_FORMAT,
1317 : GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()), mNumRetries, timeTillNextResubscription,
1318 : aTerminationCause.Format());
1319 6 : return ScheduleResubscription(timeTillNextResubscription, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT);
1320 : }
1321 :
1322 0 : void ReadClient::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
1323 : const SessionHandle & sessionHandle)
1324 : {
1325 0 : ReadClient * const _this = static_cast<ReadClient *>(context);
1326 0 : VerifyOrDie(_this != nullptr);
1327 :
1328 0 : ChipLogProgress(DataManagement, "HandleDeviceConnected");
1329 0 : _this->mReadPrepareParams.mSessionHolder.Grab(sessionHandle);
1330 0 : _this->mpExchangeMgr = &exchangeMgr;
1331 :
1332 0 : _this->mpCallback.OnCASESessionEstablished(sessionHandle, _this->mReadPrepareParams);
1333 :
1334 0 : auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
1335 0 : if (err != CHIP_NO_ERROR)
1336 : {
1337 0 : _this->Close(err);
1338 : }
1339 0 : }
1340 :
1341 0 : void ReadClient::HandleDeviceConnectionFailure(void * context, const OperationalSessionSetup::ConnectionFailureInfo & failureInfo)
1342 : {
1343 0 : ReadClient * const _this = static_cast<ReadClient *>(context);
1344 0 : VerifyOrDie(_this != nullptr);
1345 :
1346 0 : ChipLogError(DataManagement, "Failed to establish CASE for re-subscription with error '%" CHIP_ERROR_FORMAT "'",
1347 : failureInfo.error.Format());
1348 :
1349 : #if CHIP_CONFIG_ENABLE_BUSY_HANDLING_FOR_OPERATIONAL_SESSION_SETUP
1350 : #if CHIP_DETAIL_LOGGING
1351 0 : if (failureInfo.requestedBusyDelay.HasValue())
1352 : {
1353 0 : ChipLogDetail(DataManagement, "Will delay resubscription by %u ms due to BUSY response",
1354 : failureInfo.requestedBusyDelay.Value().count());
1355 : }
1356 : #endif // CHIP_DETAIL_LOGGING
1357 0 : _this->mMinimalResubscribeDelay = failureInfo.requestedBusyDelay.ValueOr(System::Clock::kZero);
1358 : #else
1359 : _this->mMinimalResubscribeDelay = System::Clock::kZero;
1360 : #endif // CHIP_CONFIG_ENABLE_BUSY_HANDLING_FOR_OPERATIONAL_SESSION_SETUP
1361 :
1362 0 : _this->Close(failureInfo.error);
1363 0 : }
1364 :
1365 11 : void ReadClient::OnResubscribeTimerCallback(System::Layer * /* If this starts being used, fix callers that pass nullptr */,
1366 : void * apAppState)
1367 : {
1368 11 : ReadClient * const _this = static_cast<ReadClient *>(apAppState);
1369 11 : VerifyOrDie(_this != nullptr);
1370 :
1371 11 : _this->mIsResubscriptionScheduled = false;
1372 :
1373 : CHIP_ERROR err;
1374 :
1375 11 : ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: ForceCASE = %d", _this->mForceCaseOnNextResub);
1376 11 : _this->mNumRetries++;
1377 :
1378 11 : bool allowResubscribeOnError = true;
1379 22 : if (!_this->mReadPrepareParams.mSessionHolder ||
1380 11 : !_this->mReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession())
1381 : {
1382 : // We don't have an active CASE session. We need to go ahead and set
1383 : // one up, if we can.
1384 0 : if (_this->EstablishSessionToPeer() == CHIP_NO_ERROR)
1385 : {
1386 0 : return;
1387 : }
1388 :
1389 0 : if (_this->mForceCaseOnNextResub)
1390 : {
1391 : // Caller asked us to force CASE but we have no way to do CASE.
1392 : // Just stop trying.
1393 0 : allowResubscribeOnError = false;
1394 : }
1395 :
1396 : // No way to send our subscribe request.
1397 0 : err = CHIP_ERROR_INCORRECT_STATE;
1398 0 : ExitNow();
1399 : }
1400 :
1401 11 : err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
1402 :
1403 11 : exit:
1404 11 : if (err != CHIP_NO_ERROR)
1405 : {
1406 : //
1407 : // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid
1408 : // CASESessionManager pointer when mForceCaseOnNextResub was true.
1409 : //
1410 : // In that case, don't permit re-subscription to occur.
1411 : //
1412 0 : _this->Close(err, allowResubscribeOnError);
1413 : }
1414 : }
1415 :
1416 65 : void ReadClient::UpdateDataVersionFilters(const ConcreteDataAttributePath & aPath)
1417 : {
1418 130 : for (size_t index = 0; index < mReadPrepareParams.mDataVersionFilterListSize; index++)
1419 : {
1420 65 : if (mReadPrepareParams.mpDataVersionFilterList[index].mEndpointId == aPath.mEndpointId &&
1421 65 : mReadPrepareParams.mpDataVersionFilterList[index].mClusterId == aPath.mClusterId)
1422 : {
1423 : // Now we know the current version for this cluster is aPath.mDataVersion.
1424 65 : mReadPrepareParams.mpDataVersionFilterList[index].mDataVersion = aPath.mDataVersion;
1425 : }
1426 : }
1427 65 : }
1428 :
1429 339 : CHIP_ERROR ReadClient::GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional<EventNumber> & aEventMin)
1430 : {
1431 339 : if (aReadPrepareParams.mEventNumber.HasValue())
1432 : {
1433 312 : aEventMin = aReadPrepareParams.mEventNumber;
1434 : }
1435 : else
1436 : {
1437 27 : ReturnErrorOnFailure(mpCallback.GetHighestReceivedEventNumber(aEventMin));
1438 27 : if (aEventMin.HasValue())
1439 : {
1440 : // We want to start with the first event _after_ the last one we received.
1441 2 : aEventMin.SetValue(aEventMin.Value() + 1);
1442 : }
1443 : }
1444 339 : return CHIP_NO_ERROR;
1445 : }
1446 :
1447 196 : bool ReadClient::TriggerResubscribeIfScheduled(const char * reason)
1448 : {
1449 196 : if (!mIsResubscriptionScheduled)
1450 : {
1451 195 : return false;
1452 : }
1453 :
1454 1 : ChipLogDetail(DataManagement, "ReadClient[%p] triggering resubscribe, reason: %s", this, reason);
1455 1 : CancelResubscribeTimer();
1456 1 : OnResubscribeTimerCallback(nullptr, this);
1457 :
1458 1 : return true;
1459 : }
1460 :
1461 0 : Optional<System::Clock::Timeout> ReadClient::GetSubscriptionTimeout()
1462 : {
1463 0 : if (!IsSubscriptionType() || !IsSubscriptionActive())
1464 : {
1465 0 : return NullOptional;
1466 : }
1467 :
1468 : System::Clock::Timeout timeout;
1469 0 : CHIP_ERROR err = ComputeLivenessCheckTimerTimeout(&timeout);
1470 0 : if (err != CHIP_NO_ERROR)
1471 : {
1472 0 : return NullOptional;
1473 : }
1474 :
1475 0 : return MakeOptional(timeout);
1476 : }
1477 :
1478 0 : CHIP_ERROR ReadClient::EstablishSessionToPeer()
1479 : {
1480 0 : ChipLogProgress(DataManagement, "Trying to establish a CASE session for subscription");
1481 0 : auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager();
1482 0 : VerifyOrReturnError(caseSessionManager != nullptr, CHIP_ERROR_INCORRECT_STATE);
1483 0 : caseSessionManager->FindOrEstablishSession(mPeer, &mOnConnectedCallback, &mOnConnectionFailureCallback);
1484 0 : return CHIP_NO_ERROR;
1485 : }
1486 :
1487 : } // namespace app
1488 : } // namespace chip
|