Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle missing projects #125

Open
andersy005 opened this issue Sep 11, 2024 · 0 comments
Open

handle missing projects #125

andersy005 opened this issue Sep 11, 2024 · 0 comments

Comments

@andersy005
Copy link
Member

i recently encountered an issue where some projects that were previously listed in the registries are no longer available. this breaks our offsetsDB data ingestion pipeline due to the following constraint:

  • every project in the credit and clip tables must have a corresponding project record in the project table.

whenever a project is listed and we pick it up in weekly summary clips or curated clips, and then it disappears from the registry, we end up with a broken ingestion pipeline. for example, back in May, ACR988 was added to the database. however, this project no longer shows up in the data we download from ACR.

to address this edge case, i made some changes to ensure that placeholder projects are created (using data that we can easily derive from a project_id, e.g., registry, details_url) for missing project IDs.

in #124, i made the following changes to handle this edge case

def ensure_projects_exist(df: pd.DataFrame, session: Session) -> None:
    """
    Ensure all project IDs in the dataframe exist in the database.
    If not, create placeholder projects for missing IDs.
    """
    logger.info('🔍 Checking for missing project IDs')

    # Get all unique project IDs from the dataframe
    credit_project_ids = df['project_id'].unique()

    # Query existing project IDs
    existing_project_ids = set(
        session.exec(
            select(Project.project_id).where(col(Project.project_id).in_(credit_project_ids))
        ).all()
    )

    # Identify missing project IDs
    missing_project_ids = set(credit_project_ids) - existing_project_ids

    logger.info(f'🔍 Found {len(existing_project_ids)} existing project IDs')
    logger.info(f'🔍 Found {len(missing_project_ids)} missing project IDs: {missing_project_ids}')

    # Create placeholder projects for missing IDs
    urls = {
        'verra': 'https://registry.verra.org/app/projectDetail/VCS/',
        'gold-standard': 'https://registry.goldstandard.org/projects?q=gs',
        'american-carbon-registry': 'https://acr2.apx.com/mymodule/reg/prjView.asp?id1=',
        'climate-action-reserve': 'https://thereserve2.apx.com/mymodule/reg/prjView.asp?id1=',
        'art-trees': 'https://art.apx.com/mymodule/reg/prjView.asp?id1=',
    }
    for project_id in missing_project_ids:
        registry = get_registry_from_project_id(project_id)
        if url := urls.get(registry):
            url = f'{url}{project_id[3:]}'
        placeholder_project = Project(
            project_id=project_id,
            registry=registry,
            category=['unknown'],
            protocol=['unknown'],
            project_url=url,
        )
        session.add(placeholder_project)

    try:
        session.commit()
        logger.info(f'✅ Added {len(missing_project_ids)} missing project IDs to the database')
    except IntegrityError as exc:
        session.rollback()
        logger.error(f'❌ Error creating placeholder projects: {exc}')
        raise


def process_dataframe(df, table_name, engine, dtype_dict=None):
    logger.info(f'📝 Writing DataFrame to {table_name}')
    logger.info(f'engine: {engine}')

    with engine.begin() as conn:
        if engine.dialect.has_table(conn, table_name):
            # Instead of dropping table (which results in data type, schema overrides), delete all rows.
            conn.execute(text(f'TRUNCATE TABLE {table_name} RESTART IDENTITY CASCADE;'))

        if table_name in {'credit', 'clipproject'}:
            session = next(get_session())
            try:
                logger.info(f'Processing data destined for {table_name} table...')
                ensure_projects_exist(df, session)
            except IntegrityError:
                logger.error('❌ Failed to ensure projects exist. Continuing with data insertion.')

        # write the data
        df.to_sql(table_name, conn, if_exists='append', index=False, dtype=dtype_dict)

this change ensures that our database remains consistent even when projects are unlisted from the registries

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant