defwait_for_log_contains(self,text,level='info'):timeout=1max_tries=5whilemax_triesisnot0:ifany(textinsforsinself.consumer_log_messages[level]):returnTrueimporttimetime.sleep(timeout)max_tries-=1self.fail("Log message was not found %s"%text)
Comments (0)
HTTPSSSH
You can clone a snippet to your computer for local editing.
Learn more.