Babak Shahian
asked on
Problem storing Kinesis data on S3 and DynamoDB
Hi, I'm building a big data streaming pipeline that takes streams from a camera through kinesis to trigger a lambda function. The lambda function will then use AWS machine learning to detect objects and the images are stored in S3 and their metadata is stored in DDB. My problem is that the first frame of the video is being stored on S3 and DynamoDB repeatedly (same image is being stored). Here is the lambda code (the main function):
def process_image(event, context):
#Initialize clients
rekog_client = boto3.client('rekognition')
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
s3_bucket = ...
s3_key_frames_root = ...
ddb_table = dynamodb.Table(...)
rekog_max_labels = ...
rekog_min_conf = float(...)
label_watch_list = ...
label_watch_min_conf = ...
#Iterate on frames fetched from Kinesis
for record in event['Records']:
frame_package_b64 = record['kinesis']['data']
frame_package = cPickle.loads(base64.b64decode(frame_package_b64))
img_bytes.append(frame_package["ImageBytes"])
frame_count = frame_package["FrameCount"]
rekog_response = rekog_client.detect_labels(
Image={
'Bytes': img_bytes
},
MaxLabels=rekog_max_labels,
MinConfidence=rekog_min_conf
)
#Iterate on rekognition labels. Enrich and prep them for storage in DynamoDB
labels_on_watch_list = []
for label in rekog_response['Labels']:
lbl = label['Name']
conf = label['Confidence']
label['OnWatchList'] = False
#Print labels and confidence to lambda console
print('{} .. conf %{:.2f}'.format(lbl, conf))
#Check label watch list and trigger action
if (lbl.upper() in (label.upper() for label in label_watch_list)
and conf >= label_watch_min_conf):
label['OnWatchList'] = True
labels_on_watch_list.append(deepcopy(label))
#Convert from float to decimal for DynamoDB
label['Confidence'] = decimal.Decimal(conf)
#Store frame image in S3
s3_key = (s3_key_frames_root + '{}/{}/{}/{}/{}.jpg').format(year, mon, day, hour, Frame_id)
print("s3_key is: "+str(s3_key))
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=img_bytes
)
#Persist frame data in dynamodb
item = {
'Frame_id': Frame_id,
'rekog_labels' : rekog_response['Labels'],
'rekog_orientation_correction' :
rekog_response['OrientationCorrection']
if 'OrientationCorrection' in rekog_response else 'ROTATE_0',
's3_bucket' : s3_bucket,
's3_key' : s3_key
}
ddb_table.put_item(Item=item)
print('Successfully processed {} records.'.format(len(event['Records'])))
return
def lambda_handler(event, context):
return process_image(event, context)
This question needs an answer!
Become an EE member today
7 DAY FREE TRIALMembers can start a 7-Day Free trial then enjoy unlimited access to the platform.
View membership options
or
Learn why we charge membership fees
We get it - no one likes a content blocker. Take one extra minute and find out why we block content.
Which attribute in the dynamodb record is problematic? Can you post an example?