Skip to content

Commit 4de0877

Browse files
authored
Merge branch 'main' into fix/add-retry-logic-pipeline-components
2 parents 2112e93 + c7175b8 commit 4de0877

9 files changed

Lines changed: 50 additions & 116 deletions

File tree

gfmstudio/fine_tuning/api.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,6 @@ def parse_hpo_form(tune_metadata: str = Form(...)) -> schemas.TuneSubmitHPO:
110110
inference_crud = crud.ItemCrud(model=Inference)
111111
inference_model_crud = crud.ItemCrud(model=InferenceModel)
112112

113-
"""
114-
Create file folders if they don't exist already
115-
"""
116-
if settings.ENVIRONMENT.lower() in ["local", "crc"]:
117-
tasks_dir = os.path.join(settings.TUNE_BASEDIR, "tune-tasks")
118-
if os.path.isdir(tasks_dir) is False:
119-
logger.info(f"Creating tasks directory: {tasks_dir}")
120-
os.makedirs(tasks_dir)
121-
122-
trained_dir = os.path.join(settings.TUNE_BASEDIR, "pre-trained")
123-
if os.path.isdir(trained_dir) is False:
124-
logger.info(f"Creating pre-trained directory: {trained_dir}")
125-
os.makedirs(trained_dir)
126-
127113

128114
###############################################
129115
# ---- Get token endpoint
@@ -996,7 +982,7 @@ async def submit_hpo_tune_yaml(
996982
bucket_key = f"tune-tasks/{tune_id}/{tune_id}_config.yaml"
997983
tune_dir = os.path.join(settings.TUNE_BASEDIR, f"tune-tasks/{tune_id}")
998984
if os.path.isdir(tune_dir) is False:
999-
os.mkdir(tune_dir)
985+
os.makedirs(tune_dir, exist_ok=True)
1000986

1001987
bucket_dir = os.path.join(settings.TUNE_BASEDIR, bucket_key)
1002988
config_data = yaml.load(config_content, Loader=yaml.SafeLoader)

gfmstudio/fine_tuning/deployment/k8-dataset-onboarding-v2-pipeline.tpl.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ metadata:
99
spec:
1010
template:
1111
spec:
12+
serviceAccountName: api-gateway-sa
1213
imagePullSecrets:
1314
- name: IMAGE_PULL_SECRET
1415
containers:
@@ -17,7 +18,7 @@ spec:
1718
imagePullPolicy: Always
1819
volumeMounts:
1920
- name: mount-data
20-
mountPath: /data
21+
mountPath: /pipeline/data
2122
env:
2223
- name: LOGLEVEL
2324
valueFrom:

gfmstudio/fine_tuning/utils/dataset_handlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ def obtain_job_log(pod_name: str, dataset_id: str) -> str:
269269
destination: str
270270
the filepath at which the job log is stored
271271
"""
272-
logs_dir = "deployment/logs"
272+
logs_dir = "/app/deployment/logs"
273273
os.makedirs(logs_dir, exist_ok=True)
274274
log_path = f"{logs_dir}/{dataset_id}.log"
275275
try:

gfmstudio/fine_tuning/utils/tune_handlers.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -655,23 +655,6 @@ async def save_tune_config(
655655
HTTPException
656656
500 if COS upload fails
657657
"""
658-
# Upload to COS if not local environment
659-
if settings.ENVIRONMENT.lower() not in ["local", "crc"]:
660-
s3 = object_storage.object_storage_client()
661-
try:
662-
await asyncify(s3.put_object)(
663-
Bucket=settings.TUNES_FILES_BUCKET,
664-
Body=rendered_template,
665-
Key=bucket_key,
666-
)
667-
logger.debug(
668-
f"Config uploaded to COS: {settings.TUNES_FILES_BUCKET}/{bucket_key}"
669-
)
670-
except Exception as exc:
671-
logger.exception("Failed to upload config to COS")
672-
raise HTTPException(
673-
status_code=500, detail="Failed to upload configuration to storage"
674-
) from exc
675658

676659
# Save to local storage
677660
tune_dir = os.path.join(settings.TUNE_BASEDIR, f"tune-tasks/{tune_id}")

gfmstudio/fine_tuning/utils/webhook_event_handlers.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,15 @@ async def handle_dataset_factory_webhooks(
319319
f"kubectl delete job onboarding-v2-pipeline-{dataset_id}"
320320
)
321321
k8s_delete_secret_command = f"kubectl delete secret dataset-onboarding-v2-pipeline-params-{dataset_id}"
322-
remove_job_deployment_file_command = f"rm {BASE_DIR}/deployment/jobs/onboarding-v2-pipeline-{dataset_id}.yaml"
322+
323+
# Validate BASE_DIR before using it in rm command
324+
if not BASE_DIR or str(BASE_DIR).strip() == "":
325+
logger.info(f"BASE_DIR is empty or invalid: '{BASE_DIR}'. Try using /app as BASE_DIR")
326+
deployment_file_path = f"/app/deployment/jobs/onboarding-v2-pipeline-{dataset_id}.yaml"
327+
remove_job_deployment_file_command = f"rm -f {deployment_file_path}"
328+
else:
329+
deployment_file_path = f"{BASE_DIR}/deployment/jobs/onboarding-v2-pipeline-{dataset_id}.yaml"
330+
remove_job_deployment_file_command = f"rm -f {deployment_file_path}"
323331

324332
try:
325333
delete_job_output = subprocess.check_output(

gfmstudio/inference/services.py

Lines changed: 30 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -123,84 +123,39 @@ async def invoke_tune_upload_handler(
123123
logger.exception(f"Network error during download:{e}")
124124
raise
125125

126-
if settings.ENVIRONMENT.lower() in ["local", "crc"]:
127-
tune_dir = os.path.join(settings.TUNE_BASEDIR, f"tune-tasks/{tune_id}")
128-
if os.path.isdir(tune_dir) is False:
129-
os.mkdir(tune_dir)
126+
tune_dir = os.path.join(settings.TUNE_BASEDIR, f"tune-tasks/{tune_id}")
127+
if os.path.isdir(tune_dir) is False:
128+
os.makedirs(tune_dir, exist_ok=True)
130129

131-
tune_config_deploy_bucket_dir = os.path.join(
132-
settings.TUNE_BASEDIR, tune_config_deploy_bucket_key
133-
)
134-
tune_config_bucket_dir = os.path.join(
135-
settings.TUNE_BASEDIR, tune_config_bucket_key
136-
)
137-
tune_checkpoint_bucket_dir = os.path.join(
138-
settings.TUNE_BASEDIR, tune_checkpoint_bucket_key
139-
)
140-
141-
if tune_config_url[0:4] == "http":
142-
with open(tune_config_deploy_bucket_dir, "w") as config_file:
143-
config_file.write(tune_config_response.text)
144-
with open(tune_config_bucket_dir, "w") as config_file:
145-
config_file.write(tune_config_response.text)
146-
147-
elif tune_config_url[0:4] == "file":
148-
if not os.path.exists(tune_config_deploy_bucket_dir):
149-
shutil.copyfile(tune_config_url[7:], tune_config_deploy_bucket_dir)
150-
if not os.path.exists(tune_config_bucket_dir):
151-
shutil.copyfile(tune_config_url[7:], tune_config_bucket_dir)
152-
153-
if tune_checkpoint_url[0:4] == "http":
154-
with open(tune_checkpoint_bucket_dir, "wb") as checkpoint_file:
155-
checkpoint_file.write(tune_checkpoint_response.content)
156-
elif tune_checkpoint_url[0:4] == "file":
157-
if not os.path.exists(tune_checkpoint_bucket_dir):
158-
shutil.copyfile(tune_checkpoint_url[7:], tune_checkpoint_bucket_dir)
130+
tune_config_deploy_bucket_dir = os.path.join(
131+
settings.TUNE_BASEDIR, tune_config_deploy_bucket_key
132+
)
133+
tune_config_bucket_dir = os.path.join(
134+
settings.TUNE_BASEDIR, tune_config_bucket_key
135+
)
136+
tune_checkpoint_bucket_dir = os.path.join(
137+
settings.TUNE_BASEDIR, tune_checkpoint_bucket_key
138+
)
159139

160-
else:
161-
pipelines_bucket_name = settings.TUNES_FILES_BUCKET
162-
try:
163-
logger.info(f"Connect to cos bucket{pipelines_bucket_name}")
164-
pipeline_s3_client = boto3.client(
165-
"s3",
166-
aws_access_key_id=settings.OBJECT_STORAGE_KEY_ID,
167-
aws_secret_access_key=settings.OBJECT_STORAGE_SEC_KEY,
168-
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT,
169-
config=Config(
170-
signature_version=settings.OBJECT_STORAGE_SIGNATURE_VERSION
171-
),
172-
verify=(settings.ENVIRONMENT.lower() not in ["local", "crc"]),
173-
)
174-
except ValueError as exc:
175-
logger.error(f"pipeline_s3_client Misconfiguration: {str(exc)}")
176-
177-
try:
178-
logger.info("Uploading tune config file")
179-
# Uploading both config_deploy.yaml and {tune_id}_config.yaml
180-
pipeline_s3_client.upload_fileobj(
181-
tune_config_response.raw,
182-
Bucket=pipelines_bucket_name,
183-
Key=tune_config_deploy_bucket_key,
184-
)
185-
pipeline_s3_client.upload_fileobj(
186-
tune_config_response.raw,
187-
Bucket=pipelines_bucket_name,
188-
Key=tune_config_bucket_key,
189-
)
190-
logger.info("Uploading tune config Succeeded")
140+
if tune_config_url[0:4] == "http":
141+
with open(tune_config_deploy_bucket_dir, "w") as config_file:
142+
config_file.write(tune_config_response.text)
143+
with open(tune_config_bucket_dir, "w") as config_file:
144+
config_file.write(tune_config_response.text)
145+
146+
elif tune_config_url[0:4] == "file":
147+
if not os.path.exists(tune_config_deploy_bucket_dir):
148+
shutil.copyfile(tune_config_url[7:], tune_config_deploy_bucket_dir)
149+
if not os.path.exists(tune_config_bucket_dir):
150+
shutil.copyfile(tune_config_url[7:], tune_config_bucket_dir)
151+
152+
if tune_checkpoint_url[0:4] == "http":
153+
with open(tune_checkpoint_bucket_dir, "wb") as checkpoint_file:
154+
checkpoint_file.write(tune_checkpoint_response.content)
155+
elif tune_checkpoint_url[0:4] == "file":
156+
if not os.path.exists(tune_checkpoint_bucket_dir):
157+
shutil.copyfile(tune_checkpoint_url[7:], tune_checkpoint_bucket_dir)
191158

192-
logger.info("uploading tune checkpoint file")
193-
pipeline_s3_client.upload_fileobj(
194-
tune_checkpoint_response.raw,
195-
Bucket=pipelines_bucket_name,
196-
Key=tune_checkpoint_bucket_key,
197-
)
198-
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
199-
logger.exception(f"Network error during download:{e}")
200-
raise
201-
except ClientError as e:
202-
logger.exception(f"Failed to upload files to cos: {e}")
203-
raise
204159
logger.info("Updating the tune with {tune_id} status to Finished")
205160
tunes_crud.update(
206161
db=db,

pipelines/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ RUN mkdir /pipeline/data \
9999
&& mkdir /.docker && chmod -R 777 /.docker \
100100
&& mkdir /.local && chmod -R 777 /.local \
101101
&& mkdir /.cache && chmod -R 777 /.cache \
102-
&& chown -R 10001:10001 /pipeline/ \
103-
&& chmod -R 777 /pipeline/
102+
&& chown -R 10001:10001 /pipeline/ /pipeline/data \
103+
&& chmod -R 777 /pipeline/ /pipeline/data
104104

105105
COPY ./components/terrakit_data_fetch/sentinelhub_config.toml /.config/sentinelhub/config.toml
106106
RUN chmod -R 777 /.config/sentinelhub/

pipelines/components/curated_upload/claimed_curated_upload_v2.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,7 @@ def main():
714714

715715
onboarding_details = {}
716716

717-
working_path = "/data/" + payload["dataset_id"]
717+
working_path = "/pipeline/data/" + payload["dataset_id"]
718718

719719
dataset_bucket = os.getenv("DATA_BUCKET", "geoft-service-datasets")
720720

@@ -895,9 +895,10 @@ def main():
895895
main()
896896
except Exception as e:
897897
logger.error(
898-
"An exception occurred when onboarding the dataset", stack_info=True
898+
f"An exception occurred when onboarding the dataset: {str(e)}",
899+
exc_info=True,
900+
stack_info=True
899901
)
900-
logger.error("Exception - " + str(e))
901902
error["message"] = str(e)
902903
onboarding_details = {}
903904
populate_onboarding_details(

pipelines/components/push_to_geoserver/push_to_geoserver_helper_functions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def add_imagemosaic_to_geoserver(geo, workspace, task_folder, layer_name, retrie
5656
logger.debug(f"Save to geoserver create coverage store for file: {retrieved_file_paths}")
5757

5858
layer_folder = f"{task_folder}/{layer_name}"
59-
os.mkdir(layer_folder)
59+
os.makedirs(layer_folder, exist_ok=True)
6060
for f in retrieved_file_paths:
6161
shutil.copy(f, f"{layer_folder}/")
6262

0 commit comments

Comments
 (0)