Link to home
Start Free TrialLog in
Avatar of Babak Shahian
Babak ShahianFlag for United States of America

asked on

Problem storing Kinesis data on S3 and DynamoDB

Hi, I'm building a big data streaming pipeline that takes streams from a camera through kinesis to trigger a lambda function. The lambda function will then use AWS machine learning to detect objects and the images are stored in S3 and their metadata is stored in DDB. My problem is that the first frame of the video is being stored on S3 and DynamoDB repeatedly (same image is being stored). Here is the lambda code (the main function):

def process_image(event, context):

    #Initialize clients
    rekog_client = boto3.client('rekognition')
    s3_client = boto3.client('s3')
    dynamodb = boto3.resource('dynamodb')

    s3_bucket = ...
    s3_key_frames_root = ...

    ddb_table = dynamodb.Table(...)
    rekog_max_labels = ...
    rekog_min_conf = float(...)
    label_watch_list = ...
    label_watch_min_conf = ...

    #Iterate on frames fetched from Kinesis
    for record in event['Records']:
        
        frame_package_b64 = record['kinesis']['data']
        frame_package = cPickle.loads(base64.b64decode(frame_package_b64))
        img_bytes.append(frame_package["ImageBytes"])
        frame_count = frame_package["FrameCount"]

        rekog_response = rekog_client.detect_labels(
            Image={
                'Bytes': img_bytes
            },
            MaxLabels=rekog_max_labels,
            MinConfidence=rekog_min_conf
        )

        #Iterate on rekognition labels. Enrich and prep them for storage in DynamoDB
        labels_on_watch_list = []
        for label in rekog_response['Labels']:
            
            lbl = label['Name']
            conf = label['Confidence']
            label['OnWatchList'] = False

            #Print labels and confidence to lambda console
            print('{} .. conf %{:.2f}'.format(lbl, conf))

            #Check label watch list and trigger action
            if (lbl.upper() in (label.upper() for label in label_watch_list)
                and conf >= label_watch_min_conf):

                label['OnWatchList'] = True
                labels_on_watch_list.append(deepcopy(label))

            #Convert from float to decimal for DynamoDB
            label['Confidence'] = decimal.Decimal(conf)

        #Store frame image in S3
        s3_key = (s3_key_frames_root + '{}/{}/{}/{}/{}.jpg').format(year, mon, day, hour, Frame_id)
        print("s3_key is: "+str(s3_key))
        s3_client.put_object(
            Bucket=s3_bucket,
            Key=s3_key,
            Body=img_bytes
        )

        #Persist frame data in dynamodb
        item = {
            'Frame_id': Frame_id,
            'rekog_labels' : rekog_response['Labels'],
            'rekog_orientation_correction' : 
                rekog_response['OrientationCorrection'] 
                if 'OrientationCorrection' in rekog_response else 'ROTATE_0',
            's3_bucket' : s3_bucket,
            's3_key' : s3_key
        }

        ddb_table.put_item(Item=item)
        
    print('Successfully processed {} records.'.format(len(event['Records'])))
    return

def lambda_handler(event, context):
    return process_image(event, context)

Open in new window

Avatar of Shalom Carmel
Shalom Carmel
Flag of Israel image

So what is the problem?
Which attribute in the dynamodb record is problematic? Can you post an example?
This question needs an answer!
Become an EE member today
7 DAY FREE TRIAL
Members can start a 7-Day Free trial then enjoy unlimited access to the platform.
View membership options
or
Learn why we charge membership fees
We get it - no one likes a content blocker. Take one extra minute and find out why we block content.