Problem storing Kinesis data on S3 and DynamoDB

Hi, I'm building a big data streaming pipeline that takes streams from a camera through kinesis to trigger a lambda function. The lambda function will then use AWS machine learning to detect objects and the images are stored in S3 and their metadata is stored in DDB. My problem is that the first frame of the video is being stored on S3 and DynamoDB repeatedly (same image is being stored). Here is the lambda code (the main function):

def process_image(event, context):

    #Initialize clients
    rekog_client = boto3.client('rekognition')
    s3_client = boto3.client('s3')
    dynamodb = boto3.resource('dynamodb')

    s3_bucket = ...
    s3_key_frames_root = ...

    ddb_table = dynamodb.Table(...)
    rekog_max_labels = ...
    rekog_min_conf = float(...)
    label_watch_list = ...
    label_watch_min_conf = ...

    #Iterate on frames fetched from Kinesis
    for record in event['Records']:
        frame_package_b64 = record['kinesis']['data']
        frame_package = cPickle.loads(base64.b64decode(frame_package_b64))
        frame_count = frame_package["FrameCount"]

        rekog_response = rekog_client.detect_labels(
                'Bytes': img_bytes

        #Iterate on rekognition labels. Enrich and prep them for storage in DynamoDB
        labels_on_watch_list = []
        for label in rekog_response['Labels']:
            lbl = label['Name']
            conf = label['Confidence']
            label['OnWatchList'] = False

            #Print labels and confidence to lambda console
            print('{} .. conf %{:.2f}'.format(lbl, conf))

            #Check label watch list and trigger action
            if (lbl.upper() in (label.upper() for label in label_watch_list)
                and conf >= label_watch_min_conf):

                label['OnWatchList'] = True

            #Convert from float to decimal for DynamoDB
            label['Confidence'] = decimal.Decimal(conf)

        #Store frame image in S3
        s3_key = (s3_key_frames_root + '{}/{}/{}/{}/{}.jpg').format(year, mon, day, hour, Frame_id)
        print("s3_key is: "+str(s3_key))

        #Persist frame data in dynamodb
        item = {
            'Frame_id': Frame_id,
            'rekog_labels' : rekog_response['Labels'],
            'rekog_orientation_correction' : 
                if 'OrientationCorrection' in rekog_response else 'ROTATE_0',
            's3_bucket' : s3_bucket,
            's3_key' : s3_key

    print('Successfully processed {} records.'.format(len(event['Records'])))

def lambda_handler(event, context):
    return process_image(event, context)

Open in new window

Babak ShahianMechanical EngineerAsked:
Who is Participating?
I wear a lot of hats...

"The solutions and answers provided on Experts Exchange have been extremely helpful to me over the last few years. I wear a lot of hats - Developer, Database Administrator, Help Desk, etc., so I know a lot of things but not a lot about one thing. Experts Exchange gives me answers from people who do know a lot about one thing, in a easy to use platform." -Todd S.

Shalom CarmelCTOCommented:
So what is the problem?
Which attribute in the dynamodb record is problematic? Can you post an example?
It's more than this solution.Get answers and train to solve all your tech problems - anytime, anywhere.Try it for free Edge Out The Competitionfor your dream job with proven skills and certifications.Get started today Stand Outas the employee with proven skills.Start learning today for free Move Your Career Forwardwith certification training in the latest technologies.Start your trial today
Big Data

From novice to tech pro — start learning today.