Server IP : 104.21.38.3 / Your IP : 162.158.190.98 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /usr/lib/python3/dist-packages/uaclient/clouds/ |
Upload File : |
import os from typing import Any, Dict from urllib.error import HTTPError from uaclient import exceptions, system, util from uaclient.clouds import AutoAttachCloudInstance IMDS_BASE_URL = "http://169.254.169.254/metadata/" API_VERSION = "2020-09-01" # Needed to get subscription ID in attested data IMDS_URLS = { "pkcs7": IMDS_BASE_URL + "attested/document?api-version=" + API_VERSION, "compute": IMDS_BASE_URL + "instance/compute?api-version=" + API_VERSION, } DMI_CHASSIS_ASSET_TAG = "/sys/class/dmi/id/chassis_asset_tag" AZURE_OVF_ENV_FILE = "/var/lib/cloud/seed/azure/ovf-env.xml" AZURE_CHASSIS_ASSET_TAG = "7783-7084-3265-9085-8269-3286-77" class UAAutoAttachAzureInstance(AutoAttachCloudInstance): # mypy does not handle @property around inner decorators # https://github.com/python/mypy/issues/1362 @property # type: ignore @util.retry(HTTPError, retry_sleeps=[1, 1, 1]) def identity_doc(self) -> Dict[str, Any]: responses = {} for key, url in sorted(IMDS_URLS.items()): url_response, _headers = util.readurl( url, headers={"Metadata": "true"}, timeout=1 ) if key == "pkcs7": responses[key] = url_response["signature"] else: responses[key] = url_response return responses @property def cloud_type(self) -> str: return "azure" @property def is_viable(self) -> bool: """This machine is a viable AzureInstance""" if os.path.exists(DMI_CHASSIS_ASSET_TAG): chassis_asset_tag = system.load_file(DMI_CHASSIS_ASSET_TAG) if AZURE_CHASSIS_ASSET_TAG == chassis_asset_tag.strip(): return True return os.path.exists(AZURE_OVF_ENV_FILE) def should_poll_for_pro_license(self) -> bool: """Unsupported""" return False def is_pro_license_present(self, *, wait_for_change: bool) -> bool: raise exceptions.InPlaceUpgradeNotSupportedError()