Problem storing Kinesis data on S3 and DynamoDB

Babak Shahian
Babak Shahian used Ask the Experts™
on
Hi, I'm building a big data streaming pipeline that takes streams from a camera through kinesis to trigger a lambda function. The lambda function will then use AWS machine learning to detect objects and the images are stored in S3 and their metadata is stored in DDB. My problem is that the first frame of the video is being stored on S3 and DynamoDB repeatedly (same image is being stored). Here is the lambda code (the main function):

def process_image(event, context):

    #Initialize clients
    rekog_client = boto3.client('rekognition')
    s3_client = boto3.client('s3')
    dynamodb = boto3.resource('dynamodb')

    s3_bucket = ...
    s3_key_frames_root = ...

    ddb_table = dynamodb.Table(...)
    rekog_max_labels = ...
    rekog_min_conf = float(...)
    label_watch_list = ...
    label_watch_min_conf = ...

    #Iterate on frames fetched from Kinesis
    for record in event['Records']:
        
        frame_package_b64 = record['kinesis']['data']
        frame_package = cPickle.loads(base64.b64decode(frame_package_b64))
        img_bytes.append(frame_package["ImageBytes"])
        frame_count = frame_package["FrameCount"]

        rekog_response = rekog_client.detect_labels(
            Image={
                'Bytes': img_bytes
            },
            MaxLabels=rekog_max_labels,
            MinConfidence=rekog_min_conf
        )

        #Iterate on rekognition labels. Enrich and prep them for storage in DynamoDB
        labels_on_watch_list = []
        for label in rekog_response['Labels']:
            
            lbl = label['Name']
            conf = label['Confidence']
            label['OnWatchList'] = False

            #Print labels and confidence to lambda console
            print('{} .. conf %{:.2f}'.format(lbl, conf))

            #Check label watch list and trigger action
            if (lbl.upper() in (label.upper() for label in label_watch_list)
                and conf >= label_watch_min_conf):

                label['OnWatchList'] = True
                labels_on_watch_list.append(deepcopy(label))

            #Convert from float to decimal for DynamoDB
            label['Confidence'] = decimal.Decimal(conf)

        #Store frame image in S3
        s3_key = (s3_key_frames_root + '{}/{}/{}/{}/{}.jpg').format(year, mon, day, hour, Frame_id)
        print("s3_key is: "+str(s3_key))
        s3_client.put_object(
            Bucket=s3_bucket,
            Key=s3_key,
            Body=img_bytes
        )

        #Persist frame data in dynamodb
        item = {
            'Frame_id': Frame_id,
            'rekog_labels' : rekog_response['Labels'],
            'rekog_orientation_correction' : 
                rekog_response['OrientationCorrection'] 
                if 'OrientationCorrection' in rekog_response else 'ROTATE_0',
            's3_bucket' : s3_bucket,
            's3_key' : s3_key
        }

        ddb_table.put_item(Item=item)
        
    print('Successfully processed {} records.'.format(len(event['Records'])))
    return

def lambda_handler(event, context):
    return process_image(event, context)

Open in new window

Comment
Watch Question

Do more with

Expert Office
EXPERT OFFICE® is a registered trademark of EXPERTS EXCHANGE®
So what is the problem?
Which attribute in the dynamodb record is problematic? Can you post an example?

Do more with

Expert Office
Submit tech questions to Ask the Experts™ at any time to receive solutions, advice, and new ideas from leading industry professionals.

Start 7-Day Free Trial