Linux ip-172-26-2-223 5.4.0-1018-aws #18-Ubuntu SMP Wed Jun 24 01:15:00 UTC 2020 x86_64
Apache
: 172.26.2.223 | : 13.58.170.28
Cant Read [ /etc/named.conf ]
8.1.13
www
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
snap /
core24 /
888 /
usr /
lib /
python3.12 /
test /
support /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
_hypothesis_stubs
[ DIR ]
drwxr-xr-x
__init__.py
78.13
KB
-rw-r--r--
ast_helper.py
1.79
KB
-rw-r--r--
asynchat.py
11.33
KB
-rw-r--r--
asyncore.py
19.9
KB
-rw-r--r--
bytecode_helper.py
4.88
KB
-rw-r--r--
hashlib_helper.py
1.86
KB
-rw-r--r--
hypothesis_helper.py
1.35
KB
-rw-r--r--
import_helper.py
10.48
KB
-rw-r--r--
interpreters.py
5.67
KB
-rw-r--r--
logging_helper.py
916
B
-rw-r--r--
os_helper.py
23.72
KB
-rw-r--r--
pty_helper.py
2.98
KB
-rw-r--r--
script_helper.py
11.45
KB
-rw-r--r--
smtpd.py
30.01
KB
-rwxr-xr-x
socket_helper.py
13.47
KB
-rw-r--r--
testcase.py
1.02
KB
-rw-r--r--
threading_helper.py
7.86
KB
-rw-r--r--
warnings_helper.py
6.69
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : threading_helper.py
import _thread import contextlib import functools import sys import threading import time import unittest from test import support #======================================================================= # Threading support to prevent reporting refleaks when running regrtest.py -R # NOTE: we use thread._count() rather than threading.enumerate() (or the # moral equivalent thereof) because a threading.Thread object is still alive # until its __bootstrap() method has returned, even after it has been # unregistered from the threading module. # thread._count(), on the other hand, only gets decremented *after* the # __bootstrap() method has returned, which gives us reliable reference counts # at the end of a test run. def threading_setup(): return _thread._count(), len(threading._dangling) def threading_cleanup(*original_values): orig_count, orig_ndangling = original_values timeout = 1.0 for _ in support.sleeping_retry(timeout, error=False): # Copy the thread list to get a consistent output. threading._dangling # is a WeakSet, its value changes when it's read. dangling_threads = list(threading._dangling) count = _thread._count() if count <= orig_count: return # Timeout! support.environment_altered = True support.print_warning( f"threading_cleanup() failed to clean up threads " f"in {timeout:.1f} seconds\n" f" before: thread count={orig_count}, dangling={orig_ndangling}\n" f" after: thread count={count}, dangling={len(dangling_threads)}") for thread in dangling_threads: support.print_warning(f"Dangling thread: {thread!r}") # The warning happens when a test spawns threads and some of these threads # are still running after the test completes. To fix this warning, join # threads explicitly to wait until they complete. # # To make the warning more likely, reduce the timeout. def reap_threads(func): """Use this function when threads are being used. This will ensure that the threads are cleaned up even when the test fails. """ @functools.wraps(func) def decorator(*args): key = threading_setup() try: return func(*args) finally: threading_cleanup(*key) return decorator @contextlib.contextmanager def wait_threads_exit(timeout=None): """ bpo-31234: Context manager to wait until all threads created in the with statement exit. Use _thread.count() to check if threads exited. Indirectly, wait until threads exit the internal t_bootstrap() C function of the _thread module. threading_setup() and threading_cleanup() are designed to emit a warning if a test leaves running threads in the background. This context manager is designed to cleanup threads started by the _thread.start_new_thread() which doesn't allow to wait for thread exit, whereas thread.Thread has a join() method. """ if timeout is None: timeout = support.SHORT_TIMEOUT old_count = _thread._count() try: yield finally: start_time = time.monotonic() for _ in support.sleeping_retry(timeout, error=False): support.gc_collect() count = _thread._count() if count <= old_count: break else: dt = time.monotonic() - start_time msg = (f"wait_threads() failed to cleanup {count - old_count} " f"threads after {dt:.1f} seconds " f"(count: {count}, old count: {old_count})") raise AssertionError(msg) def join_thread(thread, timeout=None): """Join a thread. Raise an AssertionError if the thread is still alive after timeout seconds. """ if timeout is None: timeout = support.SHORT_TIMEOUT thread.join(timeout) if thread.is_alive(): msg = f"failed to join the thread in {timeout:.1f} seconds" raise AssertionError(msg) @contextlib.contextmanager def start_threads(threads, unlock=None): try: import faulthandler except ImportError: # It isn't supported on subinterpreters yet. faulthandler = None threads = list(threads) started = [] try: try: for t in threads: t.start() started.append(t) except: if support.verbose: print("Can't start %d threads, only %d threads started" % (len(threads), len(started))) raise yield finally: try: if unlock: unlock() endtime = time.monotonic() for timeout in range(1, 16): endtime += 60 for t in started: t.join(max(endtime - time.monotonic(), 0.01)) started = [t for t in started if t.is_alive()] if not started: break if support.verbose: print('Unable to join %d threads during a period of ' '%d minutes' % (len(started), timeout)) finally: started = [t for t in started if t.is_alive()] if started: if faulthandler is not None: faulthandler.dump_traceback(sys.stdout) raise AssertionError('Unable to join %d threads' % len(started)) class catch_threading_exception: """ Context manager catching threading.Thread exception using threading.excepthook. Attributes set when an exception is caught: * exc_type * exc_value * exc_traceback * thread See threading.excepthook() documentation for these attributes. These attributes are deleted at the context manager exit. Usage: with threading_helper.catch_threading_exception() as cm: # code spawning a thread which raises an exception ... # check the thread exception, use cm attributes: # exc_type, exc_value, exc_traceback, thread ... # exc_type, exc_value, exc_traceback, thread attributes of cm no longer # exists at this point # (to avoid reference cycles) """ def __init__(self): self.exc_type = None self.exc_value = None self.exc_traceback = None self.thread = None self._old_hook = None def _hook(self, args): self.exc_type = args.exc_type self.exc_value = args.exc_value self.exc_traceback = args.exc_traceback self.thread = args.thread def __enter__(self): self._old_hook = threading.excepthook threading.excepthook = self._hook return self def __exit__(self, *exc_info): threading.excepthook = self._old_hook del self.exc_type del self.exc_value del self.exc_traceback del self.thread def _can_start_thread() -> bool: """Detect whether Python can start new threads. Some WebAssembly platforms do not provide a working pthread implementation. Thread support is stubbed and any attempt to create a new thread fails. - wasm32-wasi does not have threading. - wasm32-emscripten can be compiled with or without pthread support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__). """ if sys.platform == "emscripten": return sys._emscripten_info.pthreads elif sys.platform == "wasi": return False else: # assume all other platforms have working thread support. return True can_start_thread = _can_start_thread() def requires_working_threading(*, module=False): """Skip tests or modules that require working threading. Can be used as a function/class decorator or to skip an entire module. """ msg = "requires threading support" if module: if not can_start_thread: raise unittest.SkipTest(msg) else: return unittest.skipUnless(can_start_thread, msg)
Close