importjsonimporturllib.parseimportboto3importloggingimportrequestslogger=logging.getLogger()logger.setLevel(logging.INFO)print('Loading function')s3=boto3.client('s3')wisdom=boto3.client('wisdom')KBId='9a0ca038-d50f-4951-8a7f-467d11d69b7b'deflambda_handler(event,context):# Get the object from the event and show its content typebucket=event['Records'][0]['s3']['bucket']['name']key=urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'],encoding='utf-8')version=event['Records'][0]['s3']['object']['versionId']print('Bucket: '+bucket+' Key: '+key+' Version: '+version)response=wisdom.search_content(knowledgeBaseId=KBId,searchExpression={'filters':[{'field':'NAME','operator':'EQUALS','value':key},]})print(response)try:s3Response=s3.get_object(Bucket=bucket,Key=key,VersionId=version)print(s3Response)exceptExceptionase:print(e)print('Error getting object.')raise3print(s3Response['ContentType'])try:wisdomResponse=wisdom.start_content_upload(knowledgeBaseId=KBId,contentType=s3Response['ContentType'])print(wisdomResponse)exceptExceptionase:print(e)print('Error uploading content.')raise3print(wisdomResponse['uploadId'])try:contentResponse=wisdom.create_content(knowledgeBaseId=KBId,name=key,uploadId=wisdomResponse['uploadId'],metadata={'sourceS3Version':version})print(contentResponse)exceptExceptionase:print(e)print('Error creating content.')raise3
Comments (0)
HTTPSSSH
You can clone a snippet to your computer for local editing.
Learn more.