Skip to content

Task caching doesn't work when using default policy if an input is a result from another task that is a pandas dataframe #17547

@jbnitorum

Description

@jbnitorum

Bug summary

I have noticed when retrying failed flows runs that many of the tasks in the flow are rerunning despite having completely successfully and have a persisted result. Looking into it further it appears the issue is when the one or more of the inputs for one task is the result of another task and that result is a pandas dataframe. We recently migrated from Prefect 2 to Prefect 3 and this was never an issue previously so let me know if I am missing a behavior change. I am able to see the resulting dataframe result on S3. I suspect this must be some issues related to serialization.

We have the following settings:

PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
PREFECT_DEFAULT_RESULT_STORAGE_BLOCK='s3-bucket/my-bucket-block'

Here is an example. We are running this on AWS ECS Fargate and using Prefect Cloud. There should be cache hits on tasks 7, 8, and 9 on an initial run. If you retry the flow you should have cache hits on all tasks. What I'm seeing is only cache hits on 7 and 9 on an initial run and hits on 1, 4, 5 , 6, 7 and 9. Missing on 2, 3, and 8.

from prefect import flow, task, get_run_logger
import pandas as pd
import boto3
import requests
import io

@task(name="Getting data")
def extract(return_df=True):
    if return_df:
        url = 'https://www.federalreserve.gov/datadownload/Output.aspx?rel=H8&series=dd48166f12d986aede821fb86d9185d7&lastobs=&from=&to=&filetype=csv&label=include&layout=seriescolumn&type=package'
        r = requests.get(url)
        df = pd.read_csv(io.StringIO(r.content.decode('utf-8')))
        return df
    else:
        return 'this is a string'

@task(name="Doing something with data")
def transform(df_or_string):
    if isinstance(df_or_string, str):
        print('this is a string')
        the_string = df_or_string +' more string'
        return the_string
    else:
        df = df_or_string
        df['new_col'] = 'Add this constant'
        return df

@task(name="Log output of transform")
def load(df):
    logger = get_run_logger()
    logger.info(df)
    
@flow(name="Retry Test")
def main_flow():
    data_frame = extract(return_df = True)  ### caches on retry
    df_transformed = transform(data_frame) ### DOES NOT CACHE ON RETRY BECAUSE IT HAS AN INPUT THAT IS A DATAFRAME TYPE CACHED TASK RESULT
    _ = load(df_transformed) ### DOES NOT CACHE ON RETRY BECAUSE IT HAS AN INPUT THAT IS A DATAFRAME TYPE CACHED TASK RESULT

    string_text = extract(return_df = False) ### caches on retry
    longer_string = transform(string_text) ### caches on retry
    _ = load(longer_string) ### caches on retry

    data_frame2 = extract(return_df = True)  ### this caches on initial run and on caches on retry
    df_transformed2 = transform(data_frame2) ### DOES NOT CACHE ON INTITIAL RUN OR RETRY BECAUSE IT HAS AN INPUT THAT IS A DATAFRAME TYPE CACHED TASK RESULT
    _ = load(df_transformed2) ### THIS CACHES ON INTIIAL RUN AND ON RETRY..... not sure why this is different than the line above...???????

if __name__ == "__main__":
    main_flow()

Version info

Version:             3.2.7
API version:         0.8.4
Python version:      3.10.16
Git commit:          d4d9001e
Built:               Fri, Feb 21, 2025 7:41 PM
OS/Arch:             linux/x86_64
Profile:             ephemeral
Server type:         ephemeral
Pydantic version:    2.10.6
Server:
  Database:          sqlite
  SQLite version:    3.40.1
Integrations:
  prefect-aws:       0.5.5
  prefect-dask:      0.3.3
  prefect-dbt:       0.7.0rc1
  prefect-shell:     0.3.1
  prefect-snowflake: 0.28.2
  prefect-redis:     0.2.2

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions