micropython/tests/thread/thread_qstr1.py
Damien George 2265d70add tests/thread: Adjust thread tests so most are able to run on rp2 port.
The aim of this commit is to make it so that the existing thread tests can
be used to test the _thread module on the rp2 port.  The rp2 port only
allows up to one thread to be created at a time, and does not have the GIL
enabled.

The following changes have been made:
- run-tests.py skips mutation tests on rp2, because there's no GIL.
- run-tests.py skips other tests on rp2 that require more than one thread.
- The tests stop trying to start a new thread after there is an OSError,
  which indicates that the system cannot create more threads.
- Some of these tests also now run the test function on the main thread,
  not just the spawned threads.
- In some tests the output printing is adjusted so it's the same regardless
  of how many threads were spawned.
- Some time.sleep(1) are replaced with time.sleep(0) to make the tests run
  a little faster (finish sooner when the work is done).

For the most part the tests are unchanged for existing platforms like esp32
and unix.

Signed-off-by: Damien George <damien@micropython.org>
2024-01-05 10:02:27 +11:00

50 lines
1.1 KiB
Python

# test concurrent interning of strings
#
# MIT license; Copyright (c) 2016 Damien P. George on behalf of Pycom Ltd
import time
import _thread
# function to check the interned string
def check(s, val):
assert type(s) == str
assert int(s) == val
# main thread function
def th(base, n):
for i in range(n):
# this will intern the string and check it
exec("check('%u', %u)" % (base + i, base + i))
with lock:
global n_finished
n_finished += 1
lock = _thread.allocate_lock()
n_thread = 0
n_thread_max = 4
n_finished = 0
n_qstr_per_thread = 100 # make 1000 for a more stressful test (uses more heap)
# spawn threads
for _ in range(n_thread_max):
try:
_thread.start_new_thread(th, (n_thread * n_qstr_per_thread, n_qstr_per_thread))
n_thread += 1
except OSError:
# System cannot create a new thead, so stop trying to create them.
break
# also run the function on this main thread
th(n_thread * n_qstr_per_thread, n_qstr_per_thread)
n_thread += 1
# wait for threads to finish
while n_finished < n_thread:
time.sleep(0)
print("pass")