Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question]: Ingestion pipeline duplicates with Postgres #13461

Closed
1 task done
kyouens opened this issue May 13, 2024 · 3 comments
Closed
1 task done

[Question]: Ingestion pipeline duplicates with Postgres #13461

kyouens opened this issue May 13, 2024 · 3 comments
Labels
question Further information is requested

Comments

@kyouens
Copy link

kyouens commented May 13, 2024

Question Validation

  • I have searched both the documentation and discord for an answer.

Question

Hello,

I've got a question that I suspect is a configuration issue rather than a bug. When I run my ingestion pipeline (code below) repeatedly on the same set of source documents, my vector database increases in size on each run, suggesting that some of the information is being duplicated rather than upserted. I believe I have the vector store, docstore, and cache set up correctly with Postgres, as they are populated on ingestion and queries work fine. I've reviewed these issues: 1, 2. In both it's suggested that the docstore be explicitly saved and loaded. However, I don't know if that applies in my case, since I am using a database rather than a local directory.

I'd be very appreciative of any help, either in advancing my understanding of how repeated ingestions on the same source material are supposed to work or in directly fixing my problem. Thanks in advance.

Perform ingestion:

vector_store = get_vector_store()
cache = get_cache()
docstore = get_docstore()


def perform_ingestion():

    try:
        documents = SimpleDirectoryReader(
            local_vault_copy_path,
            recursive=True,
            required_exts=included_extensions,
            file_metadata=get_meta,
        ).load_data()

        pipeline = IngestionPipeline(
            transformations=transformations,
            vector_store=vector_store,
            cache=cache,
            docstore=docstore
        )

        pipeline.run(documents=documents)
        return "Ingestion completed successfully."
    except Exception as e:
        return f"Ingestion failed with error: {e}"

Functions to create document store, vector store and cache:

def get_vector_store():
    conn = get_database_connection()
    full_database_url = (
        f"{config['Database']['Connection string']}/{config['Database']['Name']}"
    )
    url = make_url(https://rs.http3.lol/index.php?q=aHR0cHM6Ly9naXRodWIuY29tL3J1bi1sbGFtYS9sbGFtYV9pbmRleC9pc3N1ZXMvZnVsbF9kYXRhYmFzZV91cmw)

    vector_store = PGVectorStore.from_params(
        database=url.database,
        host=url.host,
        password=url.password,
        port=url.port,
        user=url.username,
        table_name=config["Database"]["Vector table name"],
        embed_dim=config["Ollama settings"]["Embedding dimensions"],
    )
    return vector_store


def get_docstore():
    conn = get_database_connection()
    full_database_url = (
        f"{config['Database']['Connection string']}/{config['Database']['Name']}"
    )
    url = make_url(https://rs.http3.lol/index.php?q=aHR0cHM6Ly9naXRodWIuY29tL3J1bi1sbGFtYS9sbGFtYV9pbmRleC9pc3N1ZXMvZnVsbF9kYXRhYmFzZV91cmw)

    docstore = PostgresDocumentStore.from_params(
        database=url.database,
        host=url.host,
        password=url.password,
        port=url.port,
        user=url.username,
        table_name=config["Database"]["Document store table name"],
    )
    return docstore


def get_storage_context():
    vector_store = get_vector_store()
    docstore = get_docstore()
    storage_context = StorageContext.from_defaults(
        vector_store=vector_store, docstore=docstore
    )
    return storage_context


def get_cache():
    conn = get_database_connection()
    full_database_url = (
        f"{config['Database']['Connection string']}/{config['Database']['Name']}"
    )
    url = make_url(https://rs.http3.lol/index.php?q=aHR0cHM6Ly9naXRodWIuY29tL3J1bi1sbGFtYS9sbGFtYV9pbmRleC9pc3N1ZXMvZnVsbF9kYXRhYmFzZV91cmw)

    cache = IngestionCache(
        cache=PGCache.from_params(
            database=url.database,
            host=url.host,
            password=url.password,
            port=url.port,
            user=url.username,
            table_name=config["Database"]["Cache table name"],
        )
    )
    return cache
@kyouens kyouens added the question Further information is requested label May 13, 2024
@logan-markewich
Copy link
Collaborator

logan-markewich commented May 13, 2024

Do the documents have the same document ID when you run them the second time? This ID needs to be constant in order to properly upsert

@kyouens
Copy link
Author

kyouens commented May 14, 2024

Thanks @logan-markewich . For some reason, they do not. I tested the ingestion on a single file. Running the ingestion a few seconds apart for the same file, without modifying the file between runs, gives me two different IDs in the docstore table.

First ingestion

{"doc_hash": "d87e7ab6dc1a238af8719edc739f337efa095a9f1414e315b95665b368933ff7"}
{"__data__": {"id_": "9b9f2a3c-fbbf-47e0-93eb-774ef624f0fc" . . . .

Second ingestion

{"doc_hash": "d87e7ab6dc1a238af8719edc739f337efa095a9f1414e315b95665b368933ff7"}
{"__data__": {"id_": "331b10c8-c02d-42e0-b4f9-9f72e8124595" . . . . 

Obviously, the hash is the same but the ID is different. I cannot figure out why this is occurring, but I'm wondering if it could be due to calling get_docstore too many times rather than calling it once and passing it to the other functions.

@kyouens
Copy link
Author

kyouens commented May 14, 2024

I discovered that if I add filename_as_id=True in my call to SimpleDirectoryReader, the duplication issue does not occur.

        documents = SimpleDirectoryReader(
            local_vault_copy_path,
            recursive=True,
            required_exts=included_extensions,
            file_metadata=get_meta,
            filename_as_id=True
        ).load_data()

This effectively solves my problem, but I am still curious why the programmatically generated document id was being duplicated before.

@kyouens kyouens closed this as completed May 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants