Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Error when set ''use_async=True'' when using SubQuestionQueryEngine #10440

Closed
mw-hv opened this issue Feb 4, 2024 · 5 comments
Closed
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@mw-hv
Copy link

mw-hv commented Feb 4, 2024

Bug Description

Hi I keep getting this error whenever I set 'use_async=True'' when using SubQuestionQueryEngine, wondering if this is a bug? If I set it to False I won't get this issue.

setup sub question query engine

query_engine = SubQuestionQueryEngine.from_defaults(
    query_engine_tools=query_engine_tools,
    service_context=service_context,
    use_async=False #True,
)

Here is the error info:


AttributeError Traceback (most recent call last)
in
3 Return your final answer in postgresql.'''
4
----> 5 response3 = query_engine.query(query_str)
6 print(response3)

~\AppData\Roaming\Python\Python38\site-packages\llama_index\core\base_query_engine.py in query(self, str_or_query_bundle)
38 if isinstance(str_or_query_bundle, str):
39 str_or_query_bundle = QueryBundle(str_or_query_bundle)
---> 40 return self._query(str_or_query_bundle)
41
42 async def aquery(self, str_or_query_bundle: QueryType) -> RESPONSE_TYPE:

~\AppData\Roaming\Python\Python38\site-packages\llama_index\query_engine\retriever_query_engine.py in _query(self, query_bundle)
169 CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
170 ) as query_event:
--> 171 nodes = self.retrieve(query_bundle)
172 response = self._response_synthesizer.synthesize(
173 query=query_bundle,

~\AppData\Roaming\Python\Python38\site-packages\llama_index\query_engine\retriever_query_engine.py in retrieve(self, query_bundle)
125
126 def retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
--> 127 nodes = self._retriever.retrieve(query_bundle)
128 return self._apply_node_postprocessors(nodes, query_bundle=query_bundle)
129

~\AppData\Roaming\Python\Python38\site-packages\llama_index\core\base_retriever.py in retrieve(self, str_or_query_bundle)
59 payload={EventPayload.QUERY_STR: query_bundle.query_str},
60 ) as retrieve_event:
---> 61 nodes = self._retrieve(query_bundle)
62 retrieve_event.on_end(
63 payload={EventPayload.NODES: nodes},

~\AppData\Roaming\Python\Python38\site-packages\llama_index\retrievers\recursive_retriever.py in _retrieve(self, query_bundle)
182
183 def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
--> 184 retrieved_nodes, _ = self._retrieve_rec(query_bundle, query_id=None)
185 return retrieved_nodes
186

~\AppData\Roaming\Python\Python38\site-packages\llama_index\retrievers\recursive_retriever.py in _retrieve_rec(self, query_bundle, query_id, cur_similarity)
158 event.on_end(payload={EventPayload.NODES: nodes})
159
--> 160 nodes_to_add, additional_nodes = self._query_retrieved_nodes(
161 query_bundle, nodes
162 )

~\AppData\Roaming\Python\Python38\site-packages\llama_index\retrievers\recursive_retriever.py in _query_retrieved_nodes(self, query_bundle, nodes_with_score)
96 color="pink",
97 )
---> 98 cur_retrieved_nodes, cur_additional_nodes = self._retrieve_rec(
99 query_bundle,
100 query_id=node.index_id,

~\AppData\Roaming\Python\Python38\site-packages\llama_index\retrievers\recursive_retriever.py in _retrieve_rec(self, query_bundle, query_id, cur_similarity)
163
164 elif isinstance(obj, BaseQueryEngine):
--> 165 sub_resp = obj.query(query_bundle)
166 if self._verbose:
167 print_text(

~\AppData\Roaming\Python\Python38\site-packages\llama_index\core\base_query_engine.py in query(self, str_or_query_bundle)
38 if isinstance(str_or_query_bundle, str):
39 str_or_query_bundle = QueryBundle(str_or_query_bundle)
---> 40 return self._query(str_or_query_bundle)
41
42 async def aquery(self, str_or_query_bundle: QueryType) -> RESPONSE_TYPE:

~\AppData\Roaming\Python\Python38\site-packages\llama_index\query_engine\sub_question_query_engine.py in _query(self, query_bundle)
140 ]
141
--> 142 qa_pairs_all = run_async_tasks(tasks)
143 qa_pairs_all = cast(List[Optional[SubQuestionAnswerPair]], qa_pairs_all)
144 else:

~\AppData\Roaming\Python\Python38\site-packages\llama_index\async_utils.py in run_async_tasks(tasks, show_progress, progress_bar_desc)
47 return await asyncio.gather(*tasks_to_execute)
48
---> 49 outputs: List[Any] = asyncio.run(_gather())
50 return outputs
51

~\AppData\Roaming\Python\Python38\site-packages\nest_asyncio.py in run(main, debug)
29 task = asyncio.ensure_future(main)
30 try:
---> 31 return loop.run_until_complete(task)
32 finally:
33 if not task.done():

~\AppData\Roaming\Python\Python38\site-packages\nest_asyncio.py in run_until_complete(self, future)
97 raise RuntimeError(
98 'Event loop stopped before Future completed.')
---> 99 return f.result()
100
101 def _run_once(self):

C:\ProgramData\Anaconda3\lib\asyncio\futures.py in result(self)
176 self.__log_traceback = False
177 if self._exception is not None:
--> 178 raise self._exception
179 return self._result
180

C:\ProgramData\Anaconda3\lib\asyncio\tasks.py in __step(failed resolving arguments)
280 result = coro.send(None)
281 else:
--> 282 result = coro.throw(exc)
283 except StopIteration as exc:
284 if self._must_cancel:

~\AppData\Roaming\Python\Python38\site-packages\llama_index\async_utils.py in _gather()
45
46 async def _gather() -> List[Any]:
---> 47 return await asyncio.gather(*tasks_to_execute)
48
49 outputs: List[Any] = asyncio.run(_gather())

C:\ProgramData\Anaconda3\lib\asyncio\tasks.py in __wakeup(self, future)
347 def __wakeup(self, future):
348 try:
--> 349 future.result()
350 except BaseException as exc:
351 # This may also be a cancellation.

C:\ProgramData\Anaconda3\lib\asyncio\tasks.py in __step(failed resolving arguments)
278 # We use the send method directly, because coroutines
279 # don't have __iter__ and __next__ methods.
--> 280 result = coro.send(None)
281 else:
282 result = coro.throw(exc)

~\AppData\Roaming\Python\Python38\site-packages\llama_index\query_engine\sub_question_query_engine.py in _aquery_subq(self, sub_q, color)
221 print_text(f"[{sub_q.tool_name}] Q: {question}\n", color=color)
222
--> 223 response = await query_engine.aquery(question)
224 response_text = str(response)
225

~\AppData\Roaming\Python\Python38\site-packages\llama_index\core\base_query_engine.py in aquery(self, str_or_query_bundle)
44 if isinstance(str_or_query_bundle, str):
45 str_or_query_bundle = QueryBundle(str_or_query_bundle)
---> 46 return await self._aquery(str_or_query_bundle)
47
48 def retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:

~\AppData\Roaming\Python\Python38\site-packages\llama_index\query_engine\retriever_query_engine.py in _aquery(self, query_bundle)
184 CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
185 ) as query_event:
--> 186 nodes = await self.aretrieve(query_bundle)
187
188 response = await self._response_synthesizer.asynthesize(

~\AppData\Roaming\Python\Python38\site-packages\llama_index\query_engine\retriever_query_engine.py in aretrieve(self, query_bundle)
129
130 async def aretrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
--> 131 nodes = await self._retriever.aretrieve(query_bundle)
132 return self._apply_node_postprocessors(nodes, query_bundle=query_bundle)
133

~\AppData\Roaming\Python\Python38\site-packages\llama_index\core\base_retriever.py in aretrieve(self, str_or_query_bundle)
77 payload={EventPayload.QUERY_STR: query_bundle.query_str},
78 ) as retrieve_event:
---> 79 nodes = await self._aretrieve(query_bundle)
80 retrieve_event.on_end(
81 payload={EventPayload.NODES: nodes},

~\AppData\Roaming\Python\Python38\site-packages\llama_index\indices\vector_store\retrievers\retriever.py in _aretrieve(self, query_bundle)
97 )
98 )
---> 99 return await self._aget_nodes_with_embeddings(query_bundle)
100
101 def _build_vector_store_query(

~\AppData\Roaming\Python\Python38\site-packages\llama_index\indices\vector_store\retrievers\retriever.py in _aget_nodes_with_embeddings(self, query_bundle_with_embeddings)
169 ) -> List[NodeWithScore]:
170 query = self._build_vector_store_query(query_bundle_with_embeddings)
--> 171 query_result = await self._vector_store.aquery(query, **self._kwargs)
172 return self._build_node_list_from_query_result(query_result)

~\AppData\Roaming\Python\Python38\site-packages\llama_index\vector_stores\qdrant.py in aquery(self, query, **kwargs)
558 elif self.enable_hybrid:
559 # search for dense vectors only
--> 560 response = await self._aclient.search_batch(
561 collection_name=self.collection_name,
562 requests=[

AttributeError: 'NoneType' object has no attribute 'search_batch'

Version

llama-index 0.9.27

Steps to Reproduce

setup sub question query engine

query_engine = SubQuestionQueryEngine.from_defaults(
    query_engine_tools=query_engine_tools,
    service_context=service_context,
    use_async=True
)

Relevant Logs/Tracbacks

No response

@mw-hv mw-hv added bug Something isn't working triage Issue needs to be triaged/prioritized labels Feb 4, 2024
Copy link

dosubot bot commented Feb 4, 2024

🤖

Hey @mw-hv! Great to see you diving into the async depths with us again. 🌀

Thank you for providing the detailed error log. It seems like the issue is related to the use_async parameter in the SubQuestionQueryEngine class.

The use_async parameter determines whether the sub-questions should be executed asynchronously or not. If use_async is set to True, the sub-questions are executed asynchronously using asyncio. However, if use_async is set to False, the sub-questions are executed synchronously.

The error occurs because the use_async parameter is set to False in the __init__ method of the SubQuestionQueryEngine class. This means that by default, the class does not support asynchronous execution of sub-questions.

However, in the from_defaults class method, use_async is set to True by default. This means that if you're using this method to create an instance of SubQuestionQueryEngine and you don't explicitly set use_async to False, the class will attempt to execute sub-questions asynchronously, which it doesn't support by default.

To fix the error, you can either set use_async to False when using the from_defaults method, or modify the __init__ method to support asynchronous execution by default.

Here is the relevant code:

def __init__(
    self,
    question_gen: BaseQuestionGenerator,
    response_synthesizer: BaseSynthesizer,
    query_engine_tools: Sequence[QueryEngineTool],
    callback_manager: Optional[CallbackManager] = None,
    verbose: bool = True,
    use_async: bool = False,
) -> None:
    self._use_async = use_async
    ...
@classmethod
def from_defaults(
    cls,
    query_engine_tools: Sequence[QueryEngineTool],
    question_gen: Optional[BaseQuestionGenerator] = None,
    response_synthesizer: Optional[BaseSynthesizer] = None,
    service_context: Optional[ServiceContext] = None,
    verbose: bool = True,
    use_async: bool = True,
) -> "SubQuestionQueryEngine":
    return cls(
        question_gen,
        synth,
        query_engine_tools,
        callback_manager=callback_manager,
        verbose=verbose,
        use_async=use_async,
    )

You can find this code in the llama_index/query_engine/sub_question_query_engine.py file.

I hope this helps! If you have any other questions, feel free to ask.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@logan-markewich
Copy link
Collaborator

I see you are using qdrant? You will probably also have to provide the aclient in the constructor

QdrantVectorStore(client=client, aclient=aclient)

tbh we should probably use the base client+sync operation when async is not provided

@mw-hv
Copy link
Author

mw-hv commented Feb 7, 2024

@logan-markewich Thanks for your reply, Logan! Could you share any documentation how to create aclient? I searched on llamaindex doc, but didn't find any direct link point to it. Thanks

@logan-markewich
Copy link
Collaborator

import qdrant_client 

aclient = qdrant_client.AsyncQdrantClient(...)

@dosubot dosubot bot added the stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed label May 8, 2024
Copy link

dosubot bot commented May 8, 2024

Hi, @mw-hv,

I'm helping the LlamaIndex team manage their backlog and am marking this issue as stale. It looks like you encountered an AttributeError when setting 'use_async=True' with SubQuestionQueryEngine in version 0.9.27 of llama-index. I provided a detailed explanation of the error, suggesting that the issue is related to the use_async parameter and provided code snippets to illustrate the problem. Additionally, logan-markewich suggested using the base client+sync operation when async is not provided and shared code for creating a client. You then requested documentation on creating a client.

Could you please confirm if this issue is still relevant to the latest version of the LlamaIndex repository? If it is, please let the LlamaIndex team know by commenting on the issue. Otherwise, feel free to close the issue yourself, or the issue will be automatically closed in 7 days.

Thank you for your understanding and cooperation. If you have any further questions or need assistance, feel free to reach out.

@dosubot dosubot bot closed this as not planned Won't fix, can't repro, duplicate, stale May 15, 2024
@dosubot dosubot bot removed the stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed label May 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

No branches or pull requests

2 participants