liamk-ultra
liamk-ultra•2mo ago

StorageClients w/ Multiple Crawlers

Hi! This is my first time using Crawlee, and ... so far, so good. It's working. However, I noticed it was using the default FileSystemStorage and creating files locally on my development machine. That's less than ideal in production. Changing to MemoryStorageClient revealed some other problems. I'm running multiple PlaywrightCrawlers asynchronously. The reason for that is that I want to process the scraped documents in a batch, i.e. per site. Also, it's easier to keep things isolated that way. (Each target has it's own set of starting urls, link patterns to enqueue, and selectors to select.) However, this fails with MemoryStorageClient because the first crawler gets the memory, and subsequent ones generate an error:
Error crawling target dummy2: Service StorageClient is already in use. Existing value: <crawlee.storage_clients._memory._memory_storage_client.MemoryStorageClient object at 0x335e5ef30>, attempted new value: <crawlee.storage_clients._memory._memory_storage_client.MemoryStorageClient object at 0x335e7c710>.
Error crawling target dummy2: Service StorageClient is already in use. Existing value: <crawlee.storage_clients._memory._memory_storage_client.MemoryStorageClient object at 0x335e5ef30>, attempted new value: <crawlee.storage_clients._memory._memory_storage_client.MemoryStorageClient object at 0x335e7c710>.
Upon investigation I discovered the docs saying:
The FileSystemStorageClient is not safe for concurrent access from multiple crawler processes. Use it only when running a single crawler process at a time.
So, even though it appears to be working with some basic tests, I'm not confident this approach will work. I actually don't want concurrent access, I want the storage to be separated, on a per-crawler basis. (Or otherwise, segmented within the Memory or File storage.) I'm not opposed to pointing it at '/tmp' in production, but the warning makes me doubtful that it would work correctly. I did try creating multiple memory clients by setting unique queue_id, store_id and dataset_id, but that resulted in the same error. Is this a limitation, or perhaps is there some way of doing what I'm trying to do in some other way? Thanks for your help!
11 Replies
Mantisus
Mantisus•2mo ago
Hey @liamk-ultra Could you give some example for reproduction? And what version are you using? Since this functionality is only available in beta releases for now Since using the following approach, I don't get any errors.
import asyncio

from crawlee import service_locator
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
from crawlee.storage_clients import MemoryStorageClient
from crawlee.storages import RequestQueue


async def main() -> None:
storage_client = MemoryStorageClient()

service_locator.set_storage_client(storage_client)

crawler_queue_1 = await RequestQueue.open(name='crawler-queue-1')
crawler_queue_2 = await RequestQueue.open(name='crawler-queue-2')

crawler1 = HttpCrawler(request_manager=crawler_queue_1)
crawler2 = HttpCrawler(request_manager=crawler_queue_2)

@crawler1.router.default_handler
@crawler2.router.default_handler
async def request_handler(context: HttpCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')

await asyncio.gather(
crawler1.run(['https://crawlee.dev/']),
crawler2.run(['https://crawlee.dev/']),
)


if __name__ == '__main__':
asyncio.run(main())
import asyncio

from crawlee import service_locator
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
from crawlee.storage_clients import MemoryStorageClient
from crawlee.storages import RequestQueue


async def main() -> None:
storage_client = MemoryStorageClient()

service_locator.set_storage_client(storage_client)

crawler_queue_1 = await RequestQueue.open(name='crawler-queue-1')
crawler_queue_2 = await RequestQueue.open(name='crawler-queue-2')

crawler1 = HttpCrawler(request_manager=crawler_queue_1)
crawler2 = HttpCrawler(request_manager=crawler_queue_2)

@crawler1.router.default_handler
@crawler2.router.default_handler
async def request_handler(context: HttpCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')

await asyncio.gather(
crawler1.run(['https://crawlee.dev/']),
crawler2.run(['https://crawlee.dev/']),
)


if __name__ == '__main__':
asyncio.run(main())
liamk-ultra
liamk-ultraOP•2mo ago
Hi, Mantisus, Thanks for your reply. I'm using 0.6.11. Maybe that's the problem? Because I can't call MemoryStorageClient() without any parameters the way you're doing in the example above. When I run your example it throws:
TypeError: MemoryStorageClient.init() missing 6 required keyword-only arguments: 'write_metadata', 'persist_storage', 'storage_dir', 'default_request_queue_id', 'default_key_value_store_id', and 'default_dataset_id'
What beta release should I be using?
Mantisus
Mantisus•2mo ago
Correct separation into FileSystemStorage and MemoryStorage will be available from the next release. For 0.6.11, disabling file system persistence can be achieved through configuration. Set write_metadata and persist_storage to False. The problem is that we don't have versioning of the documentation yet, so the documentation on the website is not compatible with 0.6.11 and displays data relevant for the next release. 😢 You can try using - 0.6.12b34, then my example will work for you.
MEE6
MEE6•2mo ago
@Mantisus just advanced to level 8! Thanks for your contributions! 🎉
liamk-ultra
liamk-ultraOP•2mo ago
Okay, that pointed me in the right direction. The trick was to create the different crawler queues, per your example, and associate them with the crawler request_manager. This was successful using 0.6.12b34. Thanks for your help!
Mantisus
Mantisus•2mo ago
Glad it was helpful. 🙂
liamk-ultra
liamk-ultraOP•2mo ago
Is there any need to create duplicates of the KeyValue store, in addition to the RequestQueue? Now that I've been digging into it a bit more, I think I may not be taking full advantage of Crawlee. I have various "targets" each of which has its own list of starting urls, regular expressions for enqueuing links, selector for grabbing particular chunks of text, code for dealing with popups, and other information specific to crawling and processing that particular site. For efficiency, I want the sites processed at the same time I'm calling them with async as_completed(). So, I have ended up creating a separate PlaywrightCrawler for each site, each one using a different RequestQueue as you recommended. That way they get batched by site, and I can do other processing on the returned data as it arrives. This has already sped things up quite a bit. It seems like there might be a better way, given the way Crawlee is designed. Like maybe it would be possible to pass the target data in the context (with the UserData?) so that the request handler can know which request queue to use, and apply the other values to find links and selectors. But... there's only one request queue per crawler, right? I'm not explaining this very well, but it would be nice to just have the single crawler and have it return the documents for each separate target when the crawl for that target has finished. Any tips on whether something like that might work?
Josef
Josef•2mo ago
Hello, you can add different request handlers for different links based on their label. Maybe that could help you? See this example code: https://crawlee.dev/python/docs/introduction/refactoring#request-routing
Refactoring | Crawlee for Python · Fast, reliable Python web crawl...
Crawlee helps you build and maintain your Python crawlers. It's open source and modern, with type hints for Python to help you catch bugs early.
Mantisus
Mantisus•2mo ago
I'm not explaining this very well, but it would be nice to just have the single crawler and have it return the documents for each separate target when the crawl for that target has finished. Any tips on whether something like that might work?
From your description, it seems to me that you should use Label for different handlers. This way, you can specify a specific Label based on certain metadata. And if I understand correctly, if you need to get data somewhere else for processing, you can use Future. For example, as in the guide with the web server - https://crawlee.dev/python/docs/guides/running-in-web-server
Running in web server | Crawlee for Python · Fast, reliable Python...
Crawlee helps you build and maintain your Python crawlers. It's open source and modern, with type hints for Python to help you catch bugs early.
liamk-ultra
liamk-ultraOP•2mo ago
The way I have it working now is that the target instance gets serialized and added as user_data to the context. The router does use the label to determine whether we're on a top-level page (i.e. the default handler) or a linked page with the label "EVENT", in which case it uses a different method. I'll try explaining this again. A Target is a class with data specific to each targeted site, so it has a list of start urls, a regular expression for extracting links from those pages, a selector and some bs4 cleanup code. So a simplified version looks something like this: for each target in target_list: create PlaywrightCrawler() and crawl target.urls, using crawlerqueue{target.name} as the request queue, and passing in the target as user_data return when the target's crawler queue is exhausted This works great with a handful of targets, and I haven't tested it with more. The crawlers return in async as_completed order, which is what I want. As you can see, this results in lots of separate crawlers. This is what you're doing in your example. I'm just wondering, because I'm unfamiliar with Crawlee, if having many crawlers running at the same time is efficient, or if it's possible for one crawler to maintain separate queues. create unique PlaywrightCrawler() for target in target_list: add targets to the crawl list, i.e. add target.urls to the queue and pass in the user_data with it. return results as they become available with each target So maybe that means yielding the returned documents when the crawl process is finished for that target.urls and its enqueued links. But it's not clear to me how you would know when the target crawl has been completed; nor whether a crawler can have multiple queues. Without some way of distinguishing the requests they all get thrown into the same queue. BTW, crawlee + optimizations have already cut the total scrape time by 75%, so I'm pretty happy with it.
Mantisus
Mantisus•2mo ago
Overall, I think I understand your flow.
if having many crawlers running at the same time is efficient
This can be more resource-intensive than a single crawler. Especially when it comes to Playwright. But if resources are not a problem for you and this works for you, then this approach is perfectly viable.
if it's possible for one crawler to maintain separate queues.
No, a single crawler cannot work with multiple queues. One possible approach you could try
import asyncio

from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
from crawlee.storages import RequestQueue
from crawlee import Request

futures_results = {}
targets = [
{'name': 'Apify', 'urls': ['https://apify.com/', 'https://docs.apify.com/']},
{'name': 'Crawlee', 'urls': ['https://crawlee.dev/', 'https://crawlee.dev/python']}
]

async def expect_results() -> None:
process_counter = len(targets)
while process_counter > 0:
for target_name, future in futures_results.items():
if future and future.done():
print(f"Results for {target_name}: Done!")
process_counter -= 1
futures_results[target_name] = None

await asyncio.sleep(0.1) # Wait for the request to be processed


async def main() -> None:
request_queue = await RequestQueue.open()
crawler = HttpCrawler(request_manager=request_queue)
state = await crawler._use_state()
for target in targets:
state[target['name']] = len(target['urls'])
for url in target['urls']:
await request_queue.add_request(Request.from_url(url, user_data={'target_name': target['name']}))
futures_results[target['name']] = asyncio.Future()

@crawler.router.default_handler
async def request_handler(context: HttpCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')

target_name = context.request.user_data.get('target_name', 'Unknown')
state = await context.use_state()
state[target_name] -= 1
if state[target_name] == 0:
futures_results[target_name].set_result(None)

await asyncio.sleep(0.1) # Simulate some processing time

asyncio.create_task(expect_results())

await crawler.run()


asyncio.run(main())
import asyncio

from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
from crawlee.storages import RequestQueue
from crawlee import Request

futures_results = {}
targets = [
{'name': 'Apify', 'urls': ['https://apify.com/', 'https://docs.apify.com/']},
{'name': 'Crawlee', 'urls': ['https://crawlee.dev/', 'https://crawlee.dev/python']}
]

async def expect_results() -> None:
process_counter = len(targets)
while process_counter > 0:
for target_name, future in futures_results.items():
if future and future.done():
print(f"Results for {target_name}: Done!")
process_counter -= 1
futures_results[target_name] = None

await asyncio.sleep(0.1) # Wait for the request to be processed


async def main() -> None:
request_queue = await RequestQueue.open()
crawler = HttpCrawler(request_manager=request_queue)
state = await crawler._use_state()
for target in targets:
state[target['name']] = len(target['urls'])
for url in target['urls']:
await request_queue.add_request(Request.from_url(url, user_data={'target_name': target['name']}))
futures_results[target['name']] = asyncio.Future()

@crawler.router.default_handler
async def request_handler(context: HttpCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')

target_name = context.request.user_data.get('target_name', 'Unknown')
state = await context.use_state()
state[target_name] -= 1
if state[target_name] == 0:
futures_results[target_name].set_result(None)

await asyncio.sleep(0.1) # Simulate some processing time

asyncio.create_task(expect_results())

await crawler.run()


asyncio.run(main())
This is just an example, as I am not familiar with your working environment. But as I mentioned earlier, to report that Target has already been scanned, you can use Future. In this example, the expect_results function tracks which target has already been processed. I use use_state to track progress because it is protected from state races. But we originally planned it as a Context method. So at the top level, it looks a little messy when I have to call the private method crawler._use_state(). This may give you some ideas for improving your solution. In any case, I am glad that you have already achieved good optimization.

Did you find this page helpful?