Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support china region deploy #102

Merged
merged 2 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@

**Quick Start Tuturial**

**Extract document from specified S3 bucket and prefix, POST https://xxxx.execute-api.us-east-1.amazonaws.com/v1/extract, use flag need_split to configure if extracted document need to be splitted semantically or keep with original content**

Check failure on line 76 in README.md

View workflow job for this annotation

GitHub Actions / miss spelling check for words or sentences

splitted ==> split
```bash
BODY
{
"s3Bucket": "<Your S3 bucket>", eg. "llm-bot-resource"
"s3Prefix": "<Your S3 prefix>", eg. "input_samples/"
"s3_bucket": "<Your S3 bucket>", eg. "llm-bot-resource"
"s3_prefix": "<Your S3 prefix>", eg. "input_samples/"
"need_split": true
}
```

**Offline (asychronous) process to batch processing documents specified in S3 bucket and prefix, such process include extracting, splitting document content, converting to vector representation and injecting into Amazon Open Search (AOS). POST https://xxxx.execute-api.us-east-1.amazonaws.com/v1/etl**

Check failure on line 86 in README.md

View workflow job for this annotation

GitHub Actions / miss spelling check for words or sentences

asychronous ==> asynchronous
```bash
BODY
{
Expand All @@ -98,9 +98,31 @@

You should see similar outputs like this:
```bash
"Step Function triggered, Step Function ARN: arn:aws:states:us-east-1:xxxx:execution:xx-xxx:xx-xx-xx-xx-xx, Input Payload: {\"s3Bucket\": \"<Your S3 bucket>\", \"s3Prefix\": \"<Your S3 prefix>\", \"offline\": \"true\"}"
{
"execution_id": "4dd19f1c-45e1-4d18-9d70-7593f96d001a"
"step_function_arn": "arn:aws:states:us-east-1:<account_id>:execution:ETLStateA5DEA10E-Tgtw66LqdlNH:4dd19f1c-45e1-4d18-9d70-7593f96d001a",
"input_payload": "{\"s3Bucket\": \"<Your S3 bucket>\", \"s3Prefix\": \"<Your S3 prefix>\", \"offline\": \"true\", \"qaEnhance\": \"false\", \"aosIndex\": \"<Your OpenSearch index>\"}"
}
```

**Get ETL status by execution id. GET https://xxxx.execute-api.us-east-1.amazonaws.com/v1/etl/status**
```bash
Params
https://xxxx.execute-api.us-east-1.amazonaws.com/v1/etl/status?executionId=24c9bfdb-f604-4bb2-9495-187b3a38be75

```

You should see similar outputs like this:
```bash
{
"execution_id": "4dd19f1c-45e1-4d18-9d70-7593f96d001a",
"execution_status": "FAILED"
}
```




**You can query the embeddings injected into AOS after the ETL process complete, note the execution time largly depend on file size and number, and the estimate time is around 3~5 minutes per documents. POST https://xxxx.execute-api.us-east-1.amazonaws.com/v1/aos**, other operation including index, delete, query are also provided for debugging purpose.
```bash
BODY
Expand Down
16 changes: 16 additions & 0 deletions source/infrastructure/bin/main.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { App, CfnOutput, CfnParameter, Stack, StackProps } from 'aws-cdk-lib';
import {Runtime, Code, LayerVersion} from 'aws-cdk-lib/aws-lambda';
import * as path from 'path';
import { Construct } from 'constructs';
import * as dotenv from "dotenv";
import { LLMApiStack } from '../lib/api/api-stack';
import { DynamoDBStack } from '../lib/ddb/ddb-stack';
import { EtlStack } from '../lib/etl/etl-stack';
import { AssetsStack } from '../lib/model/assets-stack';
import { LLMStack } from '../lib/model/llm-stack';
import { BuildConfig } from '../lib/shared/build-config';
import { VpcStack } from '../lib/shared/vpc-stack';
import { LambdaLayers } from '../lib/shared/lambda-layers';
import { OpenSearchStack } from '../lib/vector-store/os-stack';
import { ConnectorStack } from '../lib/connector/connector-stack';

Expand All @@ -16,6 +20,8 @@ export class RootStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps = {}) {
super(scope, id, props);

this.setBuildConfig();

// add cdk input parameters for user to specify s3 bucket store model assets
// using npx cdk deploy --rollback false --parameters S3ModelAssets=llm-rag --parameters [email protected] --parameters EtlImageName=etl-image to deploy
const _S3ModelAssets = new CfnParameter(this, 'S3ModelAssets', {
Expand Down Expand Up @@ -60,6 +66,10 @@ export class RootStack extends Stack {
type: 'String',
description: 'The ECR image name which is used for ETL, eg. etl-model',
});

const lambdaLayers = new LambdaLayers(this);
const _LambdaExecutorLayer = lambdaLayers.createExecutorLayer();
const _LambdaEmbeddingLayer = lambdaLayers.createEmbeddingLayer();

// This assest stack is to mitigate issue that the model assets in s3 bucket can't be located immediately to create sagemaker model
const _AssetsStack = new AssetsStack(this, 'assets-stack', {_s3ModelAssets:_S3ModelAssets.valueAsString, env:process.env});
Expand Down Expand Up @@ -136,6 +146,8 @@ export class RootStack extends Stack {
_jobDefinitionArn: _ConnectorStack._jobDefinitionArn,
_etlEndpoint: _EtlStack._etlEndpoint,
_resBucketName: _EtlStack._resBucketName,
_ApiLambdaExecutorLayer: _LambdaExecutorLayer,
_ApiLambdaEmbeddingLayer: _LambdaEmbeddingLayer,
env:process.env
});
_ApiStack.addDependency(_VpcStack);
Expand All @@ -160,6 +172,10 @@ export class RootStack extends Stack {
new CfnOutput(this, 'Processed Object Table', {value:_EtlStack._processedObjectsTable});
new CfnOutput(this, 'Chunk Bucket', {value:_EtlStack._resBucketName});
}

private setBuildConfig() {
BuildConfig.PIP_PARAMETER = this.node.tryGetContext('PipParameter') ?? '';
}
}

// for development, use account/region from cdk cli
Expand Down
3 changes: 3 additions & 0 deletions source/infrastructure/cdk.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@
"node_modules"
]
},
"context": {
"@aws-cdk/customresources:installLatestAwsSdkDefault": false
},
"//": "~~ Generated by projen. To modify, edit .projenrc.js and run \"npx projen\"."
}
99 changes: 70 additions & 29 deletions source/infrastructure/lib/api/api-stack.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { NestedStack, StackProps, Duration, Aws } from 'aws-cdk-lib';
import { DockerImageFunction, Handler } from 'aws-cdk-lib/aws-lambda';
import { DockerImageCode, Architecture } from 'aws-cdk-lib/aws-lambda';
import { Function, Runtime, Code, Architecture } from 'aws-cdk-lib/aws-lambda';
import * as iam from "aws-cdk-lib/aws-iam";
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as apigw from 'aws-cdk-lib/aws-apigateway';
Expand All @@ -15,6 +14,9 @@ import { WebSocketStack } from './websocket-api';
import { ApiQueueStack } from './api-queue';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';

// import { DockerImageFunction, Handler } from 'aws-cdk-lib/aws-lambda';
// import { DockerImageCode } from 'aws-cdk-lib/aws-lambda';

interface apiStackProps extends StackProps {
_vpc: ec2.Vpc;
_securityGroup: ec2.SecurityGroup;
Expand All @@ -32,6 +34,8 @@ interface apiStackProps extends StackProps {
_jobDefinitionArn: string;
_etlEndpoint: string;
_resBucketName: string;
_ApiLambdaExecutorLayer: lambda.LayerVersion;
_ApiLambdaEmbeddingLayer: lambda.LayerVersion;
}

export class LLMApiStack extends NestedStack {
Expand All @@ -52,6 +56,9 @@ export class LLMApiStack extends NestedStack {
const _jobDefinitionArn = props._jobDefinitionArn
const _etlEndpoint = props._etlEndpoint
const _resBucketName = props._resBucketName
const _ApiLambdaExecutorLayer = props._ApiLambdaExecutorLayer
const _ApiLambdaEmbeddingLayer = props._ApiLambdaEmbeddingLayer


const queueStack = new ApiQueueStack(this, 'LLMQueueStack');
const sqsStatement = queueStack.sqsStatement;
Expand All @@ -64,9 +71,11 @@ export class LLMApiStack extends NestedStack {
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
});

const lambdaExecutor = new DockerImageFunction(this,
const lambdaExecutor = new Function(this,
"lambdaExecutor", {
code: DockerImageCode.fromImageAsset(join(__dirname, "../../../lambda/executor")),
runtime: Runtime.PYTHON_3_11,
handler: "main.lambda_handler",
code: Code.fromAsset(join(__dirname, "../../../lambda/executor")),
timeout: Duration.minutes(15),
memorySize: 1024,
vpc: _vpc,
Expand All @@ -86,22 +95,7 @@ export class LLMApiStack extends NestedStack {
aos_index_dict: _aosIndexDict,
chat_session_table: _chatSessionTable,
},
});

const lambdaDispatcher = new DockerImageFunction(this,
"lambdaDispatcher", {
code: DockerImageCode.fromImageAsset(join(__dirname, "../../../lambda/dispatcher")),
timeout: Duration.minutes(15),
memorySize: 1024,
vpc: _vpc,
vpcSubnets: {
subnets: _vpc.privateSubnets,
},
securityGroups: [_securityGroup],
architecture: Architecture.X86_64,
environment: {
SQS_QUEUE_URL: messageQueue.queueUrl,
},
layers: [_ApiLambdaExecutorLayer]
});

lambdaExecutor.addToRolePolicy(new iam.PolicyStatement({
Expand All @@ -124,11 +118,32 @@ export class LLMApiStack extends NestedStack {
))
lambdaExecutor.addToRolePolicy(sqsStatement);
lambdaExecutor.addEventSource(new lambdaEventSources.SqsEventSource(messageQueue));


const lambdaDispatcher = new Function(this,
"lambdaDispatcher", {
runtime: Runtime.PYTHON_3_11,
handler: "main.lambda_handler",
code: Code.fromAsset(join(__dirname, "../../../lambda/dispatcher")),
timeout: Duration.minutes(15),
memorySize: 1024,
vpc: _vpc,
vpcSubnets: {
subnets: _vpc.privateSubnets,
},
securityGroups: [_securityGroup],
architecture: Architecture.X86_64,
environment: {
SQS_QUEUE_URL: messageQueue.queueUrl,
}
});
lambdaDispatcher.addToRolePolicy(sqsStatement);

const lambdaEmbedding = new DockerImageFunction(this,
const lambdaEmbedding = new Function(this,
"lambdaEmbedding", {
code: DockerImageCode.fromImageAsset(join(__dirname, "../../../lambda/embedding")),
runtime: Runtime.PYTHON_3_11,
handler: "main.lambda_handler",
code: Code.fromAsset(join(__dirname, "../../../lambda/embedding")),
timeout: Duration.minutes(15),
memorySize: 4096,
vpc: _vpc,
Expand All @@ -142,6 +157,7 @@ export class LLMApiStack extends NestedStack {
REGION: Aws.REGION,
RES_BUCKET: _resBucketName,
},
layers: [_ApiLambdaEmbeddingLayer]
});

lambdaEmbedding.addToRolePolicy(new iam.PolicyStatement({
Expand All @@ -158,9 +174,11 @@ export class LLMApiStack extends NestedStack {
}
))

const lambdaAos = new DockerImageFunction(this,
const lambdaAos = new Function(this,
"lambdaAos", {
code: DockerImageCode.fromImageAsset(join(__dirname, "../../../lambda/aos")),
runtime: Runtime.PYTHON_3_11,
handler: "main.lambda_handler",
code: Code.fromAsset(join(__dirname, "../../../lambda/aos")),
timeout: Duration.minutes(15),
memorySize: 1024,
vpc: _vpc,
Expand All @@ -173,6 +191,7 @@ export class LLMApiStack extends NestedStack {
opensearch_cluster_domain: _domainEndpoint,
embedding_endpoint: props._embeddingEndPoints[0],
},
layers: [_ApiLambdaEmbeddingLayer]
});

lambdaAos.addToRolePolicy(new iam.PolicyStatement({
Expand All @@ -190,7 +209,7 @@ export class LLMApiStack extends NestedStack {
))

const lambdaDdb = new lambda.Function(this, "lambdaDdb", {
runtime:lambda.Runtime.PYTHON_3_7,
runtime:lambda.Runtime.PYTHON_3_11,
handler: "rating.lambda_handler",
code: lambda.Code.fromAsset(join(__dirname, "../../../lambda/ddb")),
environment: {
Expand Down Expand Up @@ -275,10 +294,9 @@ export class LLMApiStack extends NestedStack {
// Integration with Step Function to trigger ETL process
// Lambda function to trigger Step Function
const lambdaStepFunction = new lambda.Function(this, 'lambdaStepFunction', {
// format to avoid indent error, using inline for simplicity no more container pack time needed
code: lambda.Code.fromAsset(join(__dirname, "../../../lambda/etl")),
handler: 'sfn_handler.handler',
runtime: lambda.Runtime.PYTHON_3_9,
runtime: lambda.Runtime.PYTHON_3_11,
timeout: Duration.seconds(30),
environment: {
sfn_arn: props._sfnOutput.stateMachineArn,
Expand All @@ -296,14 +314,37 @@ export class LLMApiStack extends NestedStack {
_S3Bucket.addEventNotification(s3.EventType.OBJECT_CREATED, new s3n.LambdaDestination(lambdaStepFunction), { prefix: 'documents/' });
_S3Bucket.grantReadWrite(lambdaStepFunction);

const lambdaGetETLStatus = new lambda.Function(this, "lambdaGetETLStatus", {
code: lambda.Code.fromAsset(join(__dirname, "../../../lambda/etl")),
handler: "get_status.lambda_handler",
runtime: lambda.Runtime.PYTHON_3_11,
timeout: Duration.minutes(5),
environment: {
sfn_arn: props._sfnOutput.stateMachineArn,
},
memorySize: 256,
});

lambdaGetETLStatus.addToRolePolicy(new iam.PolicyStatement({
actions: [
"states:DescribeExecution",
],
effect: iam.Effect.ALLOW,
resources: ['*'],
}));
const apiResourceETLStatus = apiResourceStepFunction.addResource("status")
apiResourceETLStatus.addMethod('GET', new apigw.LambdaIntegration(lambdaGetETLStatus));

const webSocketApi = new WebSocketStack(this, 'WebSocketApi', {
dispatcherLambda: lambdaDispatcher,
sendMessageLambda: lambdaExecutor,
});

const lambdaBatch = new DockerImageFunction(this,
const lambdaBatch = new Function(this,
"lambdaBatch", {
code: DockerImageCode.fromImageAsset(join(__dirname, "../../../lambda/batch")),
code: lambda.Code.fromAsset(join(__dirname, "../../../lambda/batch")),
handler: "main.lambda_handler",
runtime: lambda.Runtime.PYTHON_3_11,
timeout: Duration.minutes(15),
memorySize: 1024,
vpc: _vpc,
Expand Down
24 changes: 15 additions & 9 deletions source/infrastructure/lib/etl/etl-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ import { NestedStack, StackProps, RemovalPolicy, Duration, Aws } from 'aws-cdk-l
import { Construct } from 'constructs';

import * as iam from 'aws-cdk-lib/aws-iam';
import * as api from 'aws-cdk-lib/aws-apigateway';
import * as glue from '@aws-cdk/aws-glue-alpha';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as s3assets from 'aws-cdk-lib/aws-s3-assets';
import * as s3deploy from 'aws-cdk-lib/aws-s3-deployment';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';
import { DockerImageCode, Architecture, DockerImageFunction} from 'aws-cdk-lib/aws-lambda';
import { Function, Runtime, Code, Architecture } from 'aws-cdk-lib/aws-lambda';
import { join } from "path";
import { off } from 'process';

// import * as api from 'aws-cdk-lib/aws-apigateway';
// import { off } from 'process';
// import * as s3assets from 'aws-cdk-lib/aws-s3-assets';
// import * as s3deploy from 'aws-cdk-lib/aws-s3-deployment';

interface etlStackProps extends StackProps {
_vpc: ec2.Vpc;
Expand Down Expand Up @@ -53,9 +53,13 @@ export class EtlStack extends NestedStack {
iam.ManagedPolicy.fromAwsManagedPolicyName('CloudWatchLogsFullAccess'),
],
});

const imageUrlDomain = (this.region === 'cn-north-1' || this.region === 'cn-northwest-1')
? '.amazonaws.com.cn/'
: '.amazonaws.com/';

// Create model, BucketDeployment construct automatically handles dependencies to ensure model assets uploaded before creating the model in this.region
const imageUrl = this.account + '.dkr.ecr.' + this.region +'.amazonaws.com/' + props._imageName + ":" + props._etlTag;
const imageUrl = this.account + '.dkr.ecr.' + this.region + imageUrlDomain + props._imageName + ":" + props._etlTag;
const model = new sagemaker.CfnModel(this, 'etl-model', {
executionRoleArn: endpointRole.roleArn,
primaryContainer: {
Expand Down Expand Up @@ -197,9 +201,11 @@ export class EtlStack extends NestedStack {
topic.addSubscription(new subscriptions.EmailSubscription(props._subEmail));

// Lambda function to for file deduplication and glue job allocation based on file number
const lambdaETL = new DockerImageFunction(this,
const lambdaETL = new Function(this,
"lambdaETL", {
code: DockerImageCode.fromImageAsset(join(__dirname, "../../../lambda/etl")),
code: Code.fromAsset(join(__dirname, "../../../lambda/etl")),
handler: "main.lambda_handler",
runtime: Runtime.PYTHON_3_11,
timeout: Duration.minutes(15),
memorySize: 1024,
architecture: Architecture.X86_64,
Expand Down
Loading
Loading