Snippets

David Macias Amazon Connect Wisdom Boto3

Created by David Macias last modified
import json
import urllib.parse
import boto3
import logging
import requests

logger = logging.getLogger()
logger.setLevel(logging.INFO)

print('Loading function')

s3 = boto3.client('s3')
wisdom = boto3.client('wisdom')
KBId = '9a0ca038-d50f-4951-8a7f-467d11d69b7b'

def lambda_handler(event, context):
    
    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    version = event['Records'][0]['s3']['object']['versionId']
    
    print('Bucket: '+bucket+' Key: '+key+' Version: '+version)
    
    response = wisdom.search_content(
        knowledgeBaseId=KBId,
        searchExpression={
            'filters': [
                {
                    'field': 'NAME',
                    'operator': 'EQUALS',
                    'value': key
                },
                ]
        })
    
    print(response)
    
    try:
        s3Response = s3.get_object(Bucket=bucket, Key=key, VersionId=version)
        print(s3Response)
    except Exception as e:
        print(e)
        print('Error getting object.')
        raise 3
    
    print(s3Response['ContentType'])

    try:
        wisdomResponse = wisdom.start_content_upload(knowledgeBaseId=KBId, contentType=s3Response['ContentType'])
        print(wisdomResponse)
    except Exception as e:
        print(e)
        print('Error uploading content.')
        raise 3
        
    print(wisdomResponse['uploadId'])
        
    try:
        contentResponse = wisdom.create_content(knowledgeBaseId=KBId, name=key, uploadId=wisdomResponse['uploadId'], metadata={'sourceS3Version':version})
        print(contentResponse)
    except Exception as e:
        print(e)
        print('Error creating content.')
        raise 3
    

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.