-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlambda_function.py
36 lines (22 loc) · 1.18 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import json
import pandas as pd
import boto3
from io import StringIO
s3 = boto3.client('s3')
sns = boto3.client('sns')
sns_arn = 'arn:aws:sns:us-east-1:025066280149:speed-dash-sns'
s3_target_bucket = 'speed-dash-target-zone'
def lambda_handler(event, context):
landing_bucket= event['Records'][0]['s3']['bucket']['name']
object_key = event['Records'][0]['s3']['object']['key']
response = s3.get_object(Bucket = landing_bucket,Key =object_key)
json_data = response['Body'].read().decode('utf-8')
df = pd.read_json(StringIO(json_data))
df = df[df["status"]== "delivered"]
s3.put_object(Bucket = s3_target_bucket, Key=f'{object_key[:10]}_delivered.json',Body =df.to_json(orient='records'))
message = f"Input S3 File {'s3://' + landing_bucket + '/' + object_key} has been processed successfully !! and uploaded to the destination bucket {'s3://' + s3_target_bucket + '/' +f'{object_key[:10]}_delivered.json'}"
sns.publish(Subject="SUCCESS - Daily Speed Dash Data Processing",TargetArn=sns_arn, Message=message, MessageStructure='text')
return {
'statusCode': 200,
'body': json.dumps('Data pipeline Successful!')
}