Server IP : 172.67.216.182 / Your IP : 108.162.226.180 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /lib/python3/dist-packages/uaclient/api/u/pro/attach/auto/full_auto_attach/ |
Upload File : |
from typing import List, Optional, Tuple from uaclient import actions, event_logger, lock, messages, util from uaclient.api import exceptions from uaclient.api.api import APIEndpoint from uaclient.api.data_types import AdditionalInfo from uaclient.config import UAConfig from uaclient.data_types import DataObject, Field, StringDataValue, data_list from uaclient.entitlements import order_entitlements_for_enabling from uaclient.entitlements.entitlement_status import CanEnableFailure event = event_logger.get_event_logger() class FullAutoAttachOptions(DataObject): fields = [ Field("enable", data_list(StringDataValue), False), Field("enable_beta", data_list(StringDataValue), False), ] def __init__( self, enable: Optional[List[str]] = None, enable_beta: Optional[List[str]] = None, ): self.enable = enable self.enable_beta = enable_beta class FullAutoAttachResult(DataObject, AdditionalInfo): pass def _enable_services_by_name( cfg: UAConfig, services: List[str], allow_beta: bool ) -> List[Tuple[str, messages.NamedMessage]]: failed_services = [] for name in order_entitlements_for_enabling(cfg, services): try: ent_ret, reason = actions.enable_entitlement_by_name( cfg, name, assume_yes=True, allow_beta=allow_beta ) except exceptions.UserFacingError as e: failed_services.append( (name, messages.NamedMessage(e.msg_code or "unknown", e.msg)) ) continue if not ent_ret: if ( reason is not None and isinstance(reason, CanEnableFailure) and reason.message is not None ): failed_services.append((name, reason.message)) else: failed_services.append( ( name, messages.NamedMessage("unknown", "failed to enable"), ) ) return failed_services def full_auto_attach(options: FullAutoAttachOptions) -> FullAutoAttachResult: return _full_auto_attach(options, UAConfig(root_mode=True)) def _full_auto_attach( options: FullAutoAttachOptions, cfg: UAConfig, *, mode: event_logger.EventLoggerMode = event_logger.EventLoggerMode.JSON ) -> FullAutoAttachResult: try: with lock.SpinLock( cfg=cfg, lock_holder="pro.api.u.pro.attach.auto.full_auto_attach.v1", ): ret = _full_auto_attach_in_lock(options, cfg, mode=mode) except Exception as e: lock.clear_lock_file_if_present() raise e return ret def _full_auto_attach_in_lock( options: FullAutoAttachOptions, cfg: UAConfig, mode: event_logger.EventLoggerMode, ) -> FullAutoAttachResult: event.set_event_mode(mode) if cfg.is_attached: raise exceptions.AlreadyAttachedError( cfg.machine_token_file.account.get("name", "") ) if util.is_config_value_true( config=cfg.cfg, path_to_value="features.disable_auto_attach" ): raise exceptions.AutoAttachDisabledError() instance = actions.get_cloud_instance(cfg) enable_default_services = ( options.enable is None and options.enable_beta is None ) actions.auto_attach(cfg, instance, allow_enable=enable_default_services) failed = [] if options.enable is not None: failed += _enable_services_by_name( cfg, options.enable, allow_beta=False ) if options.enable_beta is not None: failed += _enable_services_by_name( cfg, options.enable_beta, allow_beta=True ) if len(failed) > 0: raise exceptions.EntitlementsNotEnabledError(failed) return FullAutoAttachResult() endpoint = APIEndpoint( version="v1", name="FullAutoAttach", fn=_full_auto_attach, options_cls=FullAutoAttachOptions, )