diff --git a/.gitreview b/.gitreview index 1500a8d6d..9a1259420 100644 --- a/.gitreview +++ b/.gitreview @@ -2,3 +2,4 @@ host=review.opendev.org port=29418 project=openstack/blazar.git +defaultbranch=stable/yoga diff --git a/blazar/db/api.py b/blazar/db/api.py index 2f9d3d052..c41b262cf 100644 --- a/blazar/db/api.py +++ b/blazar/db/api.py @@ -378,6 +378,28 @@ def host_update(host_id, values): IMPL.host_update(host_id, values) +# ComputeHostResourceInventory + +def host_custom_resource_create(values): + """Create a Host CustomResource from the values.""" + return IMPL.host_custom_resource_create(values) + + +def host_custom_resource_get_all_per_host(host_id): + """Return all custom resources belonging to a specific Compute host.""" + return IMPL.host_custom_resource_get_all_per_host(host_id) + + +# ComputeHostTrait + +def host_trait_create(values): + return IMPL.host_trait_create(values) + + +def host_trait_get_all_per_host(host_id): + return IMPL.host_trait_get_all_per_host(host_id) + + # ComputeHostExtraCapabilities def host_extra_capability_create(values): diff --git a/blazar/db/migration/alembic_migrations/versions/2ae105568b6d_add_inventory_traits.py b/blazar/db/migration/alembic_migrations/versions/2ae105568b6d_add_inventory_traits.py new file mode 100644 index 000000000..e9d69db83 --- /dev/null +++ b/blazar/db/migration/alembic_migrations/versions/2ae105568b6d_add_inventory_traits.py @@ -0,0 +1,64 @@ +# Copyright 2023 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""test + +Revision ID: 2ae105568b6d +Revises: e20cbce0504c +Create Date: 2023-07-06 15:46:28.036821 + +""" + +# revision identifiers, used by Alembic. +revision = '2ae105568b6d' +down_revision = 'e20cbce0504c' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('computehost_resource_inventory', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('computehost_id', sa.String(length=36), nullable=True), + sa.Column('resource_class', sa.String(length=255), nullable=False), + sa.Column('allocation_ratio', sa.Float(), nullable=False), + sa.Column('total', sa.Integer(), nullable=False), + sa.Column('reserved', sa.Integer(), nullable=False), + sa.Column('max_unit', sa.Integer(), nullable=False), + sa.Column('min_unit', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['computehost_id'], ['computehosts.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_table('computehost_trait', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('computehost_id', sa.String(length=36), nullable=True), + sa.Column('trait', sa.String(length=255), nullable=False), + sa.ForeignKeyConstraint(['computehost_id'], ['computehosts.id'], ), + sa.PrimaryKeyConstraint('id') + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('computehost_trait') + op.drop_table('computehost_resource_inventory') + # ### end Alembic commands ### \ No newline at end of file diff --git a/blazar/db/migration/alembic_migrations/versions/e20cbce0504c_add_resource_inventory.py b/blazar/db/migration/alembic_migrations/versions/e20cbce0504c_add_resource_inventory.py new file mode 100644 index 000000000..5ba0f246d --- /dev/null +++ b/blazar/db/migration/alembic_migrations/versions/e20cbce0504c_add_resource_inventory.py @@ -0,0 +1,52 @@ +# Copyright 2023 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add resource inventory + +Revision ID: e20cbce0504c +Revises: 02e2f2186d98 +Create Date: 2023-07-06 14:45:39.036229 + +""" + +# revision identifiers, used by Alembic. +revision = 'e20cbce0504c' +down_revision = '02e2f2186d98' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('instance_reservations', sa.Column('resource_inventory', sa.Text().with_variant(mysql.MEDIUMTEXT(), 'mysql'), nullable=True)) + op.add_column('instance_reservations', sa.Column('resource_traits', sa.Text().with_variant(mysql.MEDIUMTEXT(), 'mysql'), nullable=True)) + op.add_column('instance_reservations', sa.Column('source_flavor', sa.Text().with_variant(mysql.MEDIUMTEXT(), 'mysql'), nullable=True)) + op.alter_column('instance_reservations', 'affinity', + existing_type=mysql.TINYINT(display_width=1), + nullable=True) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column('instance_reservations', 'affinity', + existing_type=mysql.TINYINT(display_width=1), + nullable=False) + op.drop_column('instance_reservations', 'source_flavor_id') + op.drop_column('instance_reservations', 'resource_traits') + op.drop_column('instance_reservations', 'resource_inventory') + # ### end Alembic commands ### \ No newline at end of file diff --git a/blazar/db/sqlalchemy/api.py b/blazar/db/sqlalchemy/api.py index a324f1c45..c5a47e2f6 100644 --- a/blazar/db/sqlalchemy/api.py +++ b/blazar/db/sqlalchemy/api.py @@ -746,6 +746,69 @@ def host_destroy(host_id): session.delete(host) +# ComputeHostResourceInventory + +def host_custom_resource_create(values): + values = values.copy() + + custom_resource = models.ComputeHostResourceInventory() + custom_resource.update(values) + + session = get_session() + with session.begin(): + try: + custom_resource.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.BlazarDBDuplicateEntry( + model=custom_resource.__class__.__name__, + columns=e.columns) + + return None + + +def _host_custom_resource_get_all_per_host(session, host_id): + query = model_query(models.ComputeHostResourceInventory, session) + LOG.debug(query) + return query.filter_by(computehost_id=host_id) + + +def host_custom_resource_get_all_per_host(host_id): + return _host_custom_resource_get_all_per_host(get_session(), + host_id).all() + + +# ComputeHostTrait + +def host_trait_create(values): + values = values.copy() + + custom_resource = models.ComputeHostTrait() + custom_resource.update(values) + + session = get_session() + with session.begin(): + try: + custom_resource.save(session=session) + except common_db_exc.DBDuplicateEntry as e: + # raise exception about duplicated columns (e.columns) + raise db_exc.BlazarDBDuplicateEntry( + model=custom_resource.__class__.__name__, + columns=e.columns) + + return None + + +def _host_trait_get_all_per_host(session, host_id): + query = model_query(models.ComputeHostTrait, session) + LOG.debug(query) + return query.filter_by(computehost_id=host_id) + + +def host_trait_get_all_per_host(host_id): + return _host_trait_get_all_per_host(get_session(), + host_id).all() + # ComputeHostExtraCapability def _host_resource_property_query(session): diff --git a/blazar/db/sqlalchemy/models.py b/blazar/db/sqlalchemy/models.py index 693ef6736..61bc02266 100644 --- a/blazar/db/sqlalchemy/models.py +++ b/blazar/db/sqlalchemy/models.py @@ -206,6 +206,9 @@ class InstanceReservations(mb.BlazarBase): amount = sa.Column(sa.Integer, nullable=False) affinity = sa.Column(sa.Boolean, nullable=True) resource_properties = sa.Column(MediumText(), nullable=True) + resource_inventory = sa.Column(MediumText(), nullable=True) + resource_traits = sa.Column(MediumText(), nullable=True) + source_flavor = sa.Column(MediumText(), nullable=True) flavor_id = sa.Column(sa.String(36), nullable=True) aggregate_id = sa.Column(sa.Integer, nullable=True) server_group_id = sa.Column(sa.String(36), nullable=True) @@ -253,11 +256,44 @@ class ComputeHost(mb.BlazarBase): cascade="all,delete", backref='computehost', lazy='joined') + computehost_resource_inventory = relationship( + 'ComputeHostResourceInventory', cascade="all,delete", + backref='computehost', lazy='joined') + computehost_traits = relationship( + 'ComputeHostTrait', cascade="all,delete", + backref='computehost', lazy='joined') def to_dict(self): return super(ComputeHost, self).to_dict() +class ComputeHostResourceInventory(mb.BlazarBase): + __tablename__ = 'computehost_resource_inventory' + + id = _id_column() + computehost_id = sa.Column(sa.String(36), sa.ForeignKey('computehosts.id')) + resource_class = sa.Column(sa.String(255), nullable=False) + allocation_ratio = sa.Column(sa.Float, nullable=False) + total = sa.Column(sa.Integer, nullable=False) + reserved = sa.Column(sa.Integer, nullable=False) + max_unit = sa.Column(sa.Integer, nullable=False) + min_unit = sa.Column(sa.Integer, nullable=False) + + def to_dict(self): + return super(ComputeHostResourceInventory, self).to_dict() + + +class ComputeHostTrait(mb.BlazarBase): + __tablename__ = 'computehost_trait' + + id = _id_column() + computehost_id = sa.Column(sa.String(36), sa.ForeignKey('computehosts.id')) + trait = sa.Column(sa.String(255), nullable=False) + + def to_dict(self): + return super(ComputeHostTrait, self).to_dict() + + class ComputeHostExtraCapability(mb.BlazarBase): """Description diff --git a/blazar/manager/exceptions.py b/blazar/manager/exceptions.py index 9883af637..221789f0a 100644 --- a/blazar/manager/exceptions.py +++ b/blazar/manager/exceptions.py @@ -60,6 +60,10 @@ class HostNotFound(exceptions.NotFound): msg_fmt = _("Host '%(host)s' not found!") +class ResourceProviderNotFound(exceptions.NotFound): + msg_fmt = _("Resource provider for host '%(host)s' not found!") + + class InvalidHost(exceptions.NotAuthorized): msg_fmt = _("Invalid values for host %(host)s") diff --git a/blazar/plugins/instances/instance_plugin.py b/blazar/plugins/instances/instance_plugin.py index 97e883641..01184fcaa 100644 --- a/blazar/plugins/instances/instance_plugin.py +++ b/blazar/plugins/instances/instance_plugin.py @@ -14,6 +14,7 @@ import collections import datetime +import json import retrying from novaclient import exceptions as nova_exceptions @@ -90,10 +91,24 @@ def filter_hosts_by_reservation(self, hosts, start_date, end_date, return free, non_free - def max_usages(self, host, reservations): - def resource_usage_by_event(event, resource_type): - return event['reservation']['instance_reservation'][resource_type] - + def _max_usages(self, reservations): + def resource_usage_by_event(event): + instance_reservation = event['reservation']['instance_reservation'] + resource_inventory = instance_reservation['resource_inventory'] + if resource_inventory: + resource_inventory = json.loads(resource_inventory) + if not resource_inventory: + # backwards compatible with older reservations + # that do not have a resource_inventory populated + resource_inventory = { + "VCPU": instance_reservation['vcpus'], + "MEMORY_MB": instance_reservation['memory_mb'], + "DISK_GB": instance_reservation['disk_gb'], + } + return resource_inventory + + # Get sorted list of events for all reservations + # that exist in the target time window events_list = [] for r in reservations: fetched_events = db_api.event_get_all_sorted_by_filters( @@ -101,47 +116,99 @@ def resource_usage_by_event(event, resource_type): filters={'lease_id': r['lease_id']}) events_list.extend([{'event': e, 'reservation': r} for e in fetched_events]) - events_list.sort(key=lambda x: x['event']['time']) - max_vcpus = max_memory = max_disk = 0 - current_vcpus = current_memory = current_disk = 0 - + current_usage = collections.defaultdict(int) + max_usage = collections.defaultdict(int) for event in events_list: + usage = resource_usage_by_event(event) + if event['event']['event_type'] == 'start_lease': - current_vcpus += resource_usage_by_event(event, 'vcpus') - current_memory += resource_usage_by_event(event, 'memory_mb') - current_disk += resource_usage_by_event(event, 'disk_gb') - if max_vcpus < current_vcpus: - max_vcpus = current_vcpus - if max_memory < current_memory: - max_memory = current_memory - if max_disk < current_disk: - max_disk = current_disk + LOG.debug(f"found start{event} with {usage}") + for rc, usage_amount in usage.items(): + current_usage[rc] += usage_amount + # TODO(johngarbutt) what if the max usage is + # actually outside the target time window? + if max_usage[rc] < current_usage[rc]: + max_usage[rc] = current_usage[rc] + elif event['event']['event_type'] == 'end_lease': - current_vcpus -= resource_usage_by_event(event, 'vcpus') - current_memory -= resource_usage_by_event(event, 'memory_mb') - current_disk -= resource_usage_by_event(event, 'disk_gb') + for rc, usage_amount in usage.items(): + current_usage[rc] -= usage_amount - return max_vcpus, max_memory, max_disk + LOG.debug(f"after {event}\nusage is: {current_usage}\n" + f"max is: {max_usage}") + + return max_usage + + def _get_hosts_list(self, host_info, resource_request): + # For each host, look how many slots are available, + # given the current list of reservations within the + # target time window for this host + + # get high water mark of usage during all reservations + max_usage = self._max_usages(host_info['reservations']) + LOG.debug(f"Max usage {host_info['host']['hypervisor_hostname']} " + f"is {max_usage}") - def get_hosts_list(self, host_info, cpus, memory, disk): - hosts_list = [] host = host_info['host'] - reservations = host_info['reservations'] - max_cpus, max_memory, max_disk = self.max_usages(host, - reservations) - used_cpus, used_memory, used_disk = (cpus, memory, disk) - while (max_cpus + used_cpus <= host['vcpus'] and - max_memory + used_memory <= host['memory_mb'] and - max_disk + used_disk <= host['local_gb']): + host_crs = db_api.host_custom_resource_get_all_per_host(host['id']) + host_inventory = {cr['resource_class']: cr for cr in host_crs} + if not host_inventory: + # backwards compat for hosts added before we + # get info from placement + host_inventory = { + "VCPU": dict(total=host['vcpus'], + allocation_ration=1.0), + "MEMORY_MB": dict(total=host['memory_mb'], + allocation_ration=1.0), + "DISK_GB": dict(total=host['local_gb'], + allocation_ration=1.0), + } + LOG.debug(f"Inventory for {host_info['host']['hypervisor_hostname']} " + f"is {host_inventory}") + + # see how much room for slots we have + hosts_list = [] + current_usage = max_usage.copy() + + def has_free_slot(): + for rc, requested in resource_request.items(): + if not requested: + # skip things like requests for 0 vcpus + continue + + host_details = host_inventory.get(rc) + if not host_details: + # host doesn't have this sort of resource + LOG.debug(f"resource not found for {rc} for " + f"{host_info['host']['hypervisor_hostname']}") + return False + usage = current_usage[rc] + + if requested > host_details["max_unit"]: + # requested more than the max allowed by this host + LOG.debug(f"resource not found for {rc} for " + f"{host_info['host']['hypervisor_hostname']}") + return False + + capacity = ((host_details["total"] - host_details["reserved"]) + * host_details["allocation_ratio"]) + LOG.debug(f"Capacity is {capacity} for {rc} for " + f"{host_info['host']['hypervisor_hostname']}") + return (usage + requested) <= capacity + + while (has_free_slot()): hosts_list.append(host) - used_cpus += cpus - used_memory += memory - used_disk += disk + for rc, requested in resource_request.items(): + current_usage[rc] += requested + + LOG.debug(f"For host {host_info['host']['hypervisor_hostname']} " + f"we have {len(hosts_list)} slots.") return hosts_list def allocation_candidates(self, reservation): + self._populate_values_with_flavor_info(reservation) return self.pickup_hosts(None, reservation)['added'] def list_allocations(self, query): @@ -186,6 +253,8 @@ def query_allocations(self, hosts, lease_id=None, reservation_id=None): def query_available_hosts(self, cpus=None, memory=None, disk=None, resource_properties=None, + resource_inventory=None, + resource_traits=None, start_date=None, end_date=None, excludes_res=None): """Returns a list of available hosts for a reservation. @@ -212,16 +281,58 @@ def query_available_hosts(self, cpus=None, memory=None, disk=None, if resource_properties: filters += plugins_utils.convert_requirements(resource_properties) + LOG.debug(f"Filters are: {filters} {cpus} {resource_inventory}") hosts = db_api.reservable_host_get_all_by_queries(filters) + + LOG.debug(f"Found some hosts from db: {hosts}") + + # Remove hosts without the required custom resources + resource_extras = resource_inventory.copy() + # TODO(johngarbutt) can we remove vcpus,disk,etc as a special case? + del resource_extras["VCPU"] + del resource_extras["MEMORY_MB"] + del resource_extras["DISK_GB"] + if resource_extras: + cr_hosts = [] + for host in hosts: + host_crs = db_api.host_custom_resource_get_all_per_host( + host['id']) + host_inventory = {cr['resource_class']: cr for cr in host_crs} + host_is_ok = False + for rc, request in resource_extras.items(): + rc_info = host_inventory.get(rc) + if not rc_info: + host_is_ok = False + LOG.debug(f"Filter out, no {rc} for {host}") + break + if rc_info and request <= rc_info['max_unit']: + host_is_ok = True + else: + host_is_ok = False + LOG.debug(f"Filter out, too many {rc} for {host}") + break + if host_is_ok: + cr_hosts.append(host) + hosts = cr_hosts + + LOG.debug(f"Filtered hosts by resource classes: {hosts}") + + if resource_traits: + # TODO(johngarbutt): filter resource traits! + pass + + # Look for all reservations that match our time window + # and group that by host free_hosts, reserved_hosts = self.filter_hosts_by_reservation( hosts, start_date - datetime.timedelta(minutes=CONF.cleaning_time), end_date + datetime.timedelta(minutes=CONF.cleaning_time), excludes_res) + # See how many free slots available per host available_hosts = [] for host_info in (reserved_hosts + free_hosts): - hosts_list = self.get_hosts_list(host_info, cpus, memory, disk) + hosts_list = self._get_hosts_list(host_info, resource_inventory) available_hosts.extend(hosts_list) return available_hosts @@ -242,11 +353,16 @@ def pickup_hosts(self, reservation_id, values): req_amount = values['amount'] affinity = bool_from_string(values['affinity'], default=None) + # TODO need to check for custom resource requests! + resource_inventory = json.loads(values['resource_inventory']) + # TODO need to check traits as well! + query_params = { - 'cpus': values['vcpus'], - 'memory': values['memory_mb'], - 'disk': values['disk_gb'], + 'cpus': resource_inventory['VCPU'], + 'memory': resource_inventory['MEMORY_MB'], + 'disk': resource_inventory['DISK_GB'], 'resource_properties': values['resource_properties'], + 'resource_inventory': resource_inventory, 'start_date': values['start_date'], 'end_date': values['end_date'] } @@ -270,6 +386,8 @@ def pickup_hosts(self, reservation_id, values): # 2. hosts with reservations followed by hosts without reservations # Note that the `candidate_id_list` has already been ordered # satisfying the second requirement. + LOG.debug(f"Old hosts: {candidate_id_list}") + LOG.debug(f"Found candidates: {candidate_id_list}") if affinity: host_id_map = collections.Counter(candidate_id_list) available = {k for k, v in host_id_map.items() if v >= req_amount} @@ -317,7 +435,8 @@ def pickup_hosts(self, reservation_id, values): return {'added': added_host_ids, 'removed': removed_host_ids} - def _create_flavor(self, reservation_id, vcpus, memory, disk, group_id): + def _create_flavor(self, reservation_id, vcpus, memory, disk, group_id, + source_flavor=None): flavor_details = { 'flavorid': reservation_id, 'name': RESERVATION_PREFIX + ":" + reservation_id, @@ -333,9 +452,24 @@ def _create_flavor(self, reservation_id, vcpus, memory, disk, group_id): reservation_rc = "resources:CUSTOM_RESERVATION_" + rsv_id_rc_format extra_specs = { FLAVOR_EXTRA_SPEC: reservation_id, - "affinity_id": group_id, reservation_rc: "1" } + if group_id: + extra_specs["affinity_id"] = group_id + + # Copy across any extra specs from the source flavor + # while being sure not to overide the ones used above + if source_flavor: + source_flavor = json.loads(source_flavor) + if source_flavor: + extra_specs["blazar_copy_from_id"] = source_flavor["id"] + extra_specs["blazar_copy_from_name"] = source_flavor["name"] + source_extra_specs = source_flavor["extra_specs"] + for key, value in source_extra_specs.items(): + if key not in extra_specs.keys(): + extra_specs[key] = value + + LOG.debug(extra_specs) reserved_flavor.set_keys(extra_specs) return reserved_flavor @@ -344,30 +478,38 @@ def _create_resources(self, inst_reservation): reservation_id = inst_reservation['reservation_id'] ctx = context.current() - user_client = nova.NovaClientWrapper() - - reserved_group = user_client.nova.server_groups.create( - RESERVATION_PREFIX + ':' + reservation_id, - 'affinity' if inst_reservation['affinity'] else 'anti-affinity' - ) - + #user_client = nova.NovaClientWrapper() + #reserved_group = user_client.nova.server_groups.create( + # RESERVATION_PREFIX + ':' + reservation_id, + # 'affinity' if inst_reservation['affinity'] else 'anti-affinity' + # ) + # TODO this should be optional!! + reserved_group_id = None + + # TODO(johngarbutt): traits and pci alias!? + resources = [] + + # TODO get PCPUs and more! + if not inst_reservation['vcpus']: + inst_reservation['vcpus'] = 1 reserved_flavor = self._create_flavor(reservation_id, inst_reservation['vcpus'], inst_reservation['memory_mb'], inst_reservation['disk_gb'], - reserved_group.id) + reserved_group_id, + inst_reservation['source_flavor']) - pool = nova.ReservationPool() + pool = nova.PlacementReservationPool() pool_metadata = { RESERVATION_PREFIX: reservation_id, 'filter_tenant_id': ctx.project_id, - 'affinity_id': reserved_group.id + 'affinity_id': reserved_group_id } agg = pool.create(name=reservation_id, metadata=pool_metadata) self.placement_client.create_reservation_class(reservation_id) - return reserved_flavor, reserved_group, agg + return reserved_flavor, reserved_group_id, agg def cleanup_resources(self, instance_reservation): def check_and_delete_resource(client, id): @@ -381,7 +523,8 @@ def check_and_delete_resource(client, id): check_and_delete_resource(self.nova.nova.server_groups, instance_reservation['server_group_id']) check_and_delete_resource(self.nova.nova.flavors, reservation_id) - check_and_delete_resource(nova.ReservationPool(), reservation_id) + # TODO(johngarbutt): should we remove all aggregates in placement here? + check_and_delete_resource(nova.PlacementReservationPool(), reservation_id) def update_resources(self, reservation_id): """Updates reserved resources in Nova. @@ -394,7 +537,7 @@ def update_resources(self, reservation_id): reservation = db_api.reservation_get(reservation_id) if reservation['status'] == 'active': - pool = nova.ReservationPool() + pool = nova.PlacementReservationPool() # Dict of number of instances to reserve on a host keyed by the # host id @@ -407,9 +550,7 @@ def update_resources(self, reservation_id): for host_id, num in allocation_map.items(): host = db_api.host_get(host_id) try: - pool.add_computehost( - reservation['aggregate_id'], - host['service_name'], stay_in=True) + pool.add_computehost(reservation["aggregate_id"], host) except mgr_exceptions.AggregateAlreadyHasHost: pass except nova_exceptions.ClientException: @@ -421,20 +562,34 @@ def update_resources(self, reservation_id): else: try: self.nova.nova.flavors.delete(reservation['id']) + # TODO(johngarbutt): get inventory? + resource_inventory = "" + resources = [] + for req in resource_inventory.split(','): + resource_class, amount = req.split(':') + resources.append({'name': resource_class, 'value': amount}) + # TODO(johngarbutt): traits and pci alias!? self._create_flavor(reservation['id'], reservation['vcpus'], reservation['memory_mb'], reservation['disk_gb'], - reservation['server_group_id']) + reservation['server_group_id'], + reservation['source_flavor']) except nova_exceptions.ClientException: LOG.exception("Failed to update Nova resources " "for reservation %s", reservation['id']) raise mgr_exceptions.NovaClientError() def _check_missing_reservation_params(self, values): - marshall_attributes = set(['vcpus', 'memory_mb', 'disk_gb', - 'amount', 'affinity', - 'resource_properties']) + marshall_attributes = set(['amount', 'affinity']) + # TODO(johngarbutt): do we want a config to require + # flavor_id and reject other requests, or an enforcer? + # if flavor_id is present, we ignore the components + # if flavor_id is not present, we require the components + if "flavor_id" not in values.keys(): + marshall_attributes = marshall_attributes.union( + ['vcpus', 'memory_mb', 'disk_gb', 'resource_properties']) + missing_attr = marshall_attributes - set(values.keys()) if missing_attr: raise mgr_exceptions.MissingParameter(param=','.join(missing_attr)) @@ -453,12 +608,86 @@ def _validate_reservation_params(self, values): raise mgr_exceptions.MalformedParameter( param='affinity (must be a bool value or None)') + def _populate_values_with_flavor_info(self, values): + if "resource_inventory" in values.keys(): + return + + # Look up flavor to get the reservation details + flavor_id = values.get('flavor_id') + + # TODO(johngarbutt) hack to get flavor in via horizon!! + if not flavor_id and values['resource_properties'] and "flavor" in values['resource_properties']: + from oslo_serialization import jsonutils + requirements = jsonutils.loads(values['resource_properties']) + flavor_id = requirements[1] + values['resource_properties'] = "" + + resource_inventory = {} + resource_traits = {} + source_flavor = {} + + if not flavor_id: + # create resource requests from legacy values, if present + resource_inventory["VCPU"] = values.get('vcpus', 0) + resource_inventory["MEMORY_MB"] = values.get('memory_mb', 0) + resource_inventory["DISK_GB"] = values.get('disk_gb', 0) + + else: + user_client = nova.NovaClientWrapper() + flavor = user_client.nova.nova.flavors.get(flavor_id) + source_flavor = flavor.to_dict() + # TODO(johngarbutt): use newer api to get this above + source_flavor["extra_specs"] = flavor.get_keys() + + # Populate the legacy instance reservation fields + # And override what the user specified, if anything + values['vcpus'] = int(source_flavor['vcpus']) + values['memory_mb'] = int(source_flavor['ram']) + values['disk_gb'] = ( + int(source_flavor['disk']) + + int(source_flavor['OS-FLV-EXT-DATA:ephemeral'])) + + # add default resource requests + resource_inventory["VCPU"] = values['vcpus'] + resource_inventory["MEMORY_MB"] = values['memory_mb'] + resource_inventory["DISK_GB"] = values['disk_gb'] + + # Check for PCPUs + hw_cpu_policy = source_flavor['extra_specs'].get("hw:cpu_policy") + if hw_cpu_policy == "dedicated": + resource_inventory["PCPU"] = source_flavor['vcpus'] + resource_inventory["VCPU"] = 0 + + # Check for traits and extra resources + for key, value in source_flavor['extra_specs'].items(): + if key.startswith("trait:"): + trait = key.split(":")[1] + if value == "required": + resource_traits[trait] = "required" + elif value == "forbidden": + resource_traits[trait] = "forbidden" + + if key.startswith("resources:"): + rc = key.split(":")[1] + resource_inventory[rc] = int(value) + + values["resource_inventory"] = json.dumps(resource_inventory) + values["resource_traits"] = json.dumps(resource_traits) + values["source_flavor"] = json.dumps(source_flavor) + + LOG.debug(values) + def reserve_resource(self, reservation_id, values): self._check_missing_reservation_params(values) self._validate_reservation_params(values) + # when user specifies a flavor, + # populate values from the flavor + self._populate_values_with_flavor_info(values) + hosts = self.pickup_hosts(reservation_id, values) + # TODO(johngarbutt): need the flavor resource_inventory stuff here instance_reservation_val = { 'reservation_id': reservation_id, 'vcpus': values['vcpus'], @@ -466,7 +695,10 @@ def reserve_resource(self, reservation_id, values): 'disk_gb': values['disk_gb'], 'amount': values['amount'], 'affinity': bool_from_string(values['affinity'], default=None), - 'resource_properties': values['resource_properties'] + 'resource_properties': values['resource_properties'], + 'resource_inventory': values['resource_inventory'], + 'resource_traits': values['resource_traits'], + 'source_flavor': values['source_flavor'], } instance_reservation = db_api.instance_reservation_create( instance_reservation_val) @@ -476,7 +708,7 @@ def reserve_resource(self, reservation_id, values): 'reservation_id': reservation_id}) try: - flavor, group, pool = self._create_resources(instance_reservation) + flavor, group_id, pool = self._create_resources(instance_reservation) except nova_exceptions.ClientException: LOG.exception("Failed to create Nova resources " "for reservation %s", reservation_id) @@ -485,7 +717,7 @@ def reserve_resource(self, reservation_id, values): db_api.instance_reservation_update(instance_reservation['id'], {'flavor_id': flavor.id, - 'server_group_id': group.id, + 'server_group_id': group_id, 'aggregate_id': pool.id}) return instance_reservation['id'] @@ -585,7 +817,7 @@ def on_start(self, resource_id): 'project_id': ctx.project_id}) raise mgr_exceptions.EventError() - pool = nova.ReservationPool() + pool = nova.PlacementReservationPool() # Dict of number of instances to reserve on a host keyed by the # host id @@ -597,8 +829,7 @@ def on_start(self, resource_id): for host_id, num in allocation_map.items(): host = db_api.host_get(host_id) - pool.add_computehost(instance_reservation['aggregate_id'], - host['service_name'], stay_in=True) + pool.add_computehost(instance_reservation["aggregate_id"], host) self.placement_client.update_reservation_inventory( host['hypervisor_hostname'], reservation_id, num) @@ -618,6 +849,8 @@ def on_end(self, resource_id): reservation_id=reservation_id) for allocation in allocations: host = db_api.host_get(allocation['compute_host_id']) + pool = nova.PlacementReservationPool() + pool.remove_computehost(instance_reservation["aggregate_id"], host) db_api.host_allocation_destroy(allocation['id']) hostnames.append(host['hypervisor_hostname']) @@ -772,12 +1005,11 @@ def _select_host(self, reservation, lease): def _pre_reallocate(self, reservation, host_id): """Delete the reservation inventory/aggregates for the host.""" - pool = nova.ReservationPool() + pool = nova.PlacementReservationPool() # Remove the failed host from the aggregate. if reservation['status'] == status.reservation.ACTIVE: host = db_api.host_get(host_id) - pool.remove_computehost(reservation['aggregate_id'], - host['service_name']) + pool.remove_computehost(reservation["aggregate_id"], host) try: self.placement_client.delete_reservation_inventory( host['hypervisor_hostname'], reservation['id']) @@ -786,13 +1018,11 @@ def _pre_reallocate(self, reservation, host_id): def _post_reallocate(self, reservation, lease, host_id, num): """Add the reservation inventory/aggregates for the host.""" - pool = nova.ReservationPool() + pool = nova.PlacementReservationPool() if reservation['status'] == status.reservation.ACTIVE: # Add the alternative host into the aggregate. new_host = db_api.host_get(host_id) - pool.add_computehost(reservation['aggregate_id'], - new_host['service_name'], - stay_in=True) + pool.add_computehost(reservation["aggregate_id"], new_host) # Here we use "additional=True" not to break the existing # inventory(allocations) on the new host self.placement_client.update_reservation_inventory( diff --git a/blazar/plugins/oshosts/host_plugin.py b/blazar/plugins/oshosts/host_plugin.py index b4f1d4e09..23222a4f3 100644 --- a/blazar/plugins/oshosts/host_plugin.py +++ b/blazar/plugins/oshosts/host_plugin.py @@ -347,11 +347,14 @@ def create_computehost(self, host_values): with trusts.create_ctx_from_trust(trust_id): inventory = nova.NovaInventory() - servers = inventory.get_servers_per_host(host_ref) - if servers: - raise manager_ex.HostHavingServers(host=host_ref, - servers=servers) + # TODO(johngarbutt): hack to work around full hypervisors! + #servers = inventory.get_servers_per_host(host_ref) + #if servers: + # raise manager_ex.HostHavingServers(host=host_ref, + # servers=servers) host_details = inventory.get_host_details(host_ref) + LOG.debug(f"host details: {host_details}") + hostname = host_details['hypervisor_hostname'] # NOTE(sbauza): Only last duplicate name for same extra capability # will be stored to_store = set(host_values.keys()) - set(host_details.keys()) @@ -363,12 +366,11 @@ def create_computehost(self, host_values): if any([len(key) > 64 for key in extra_capabilities_keys]): raise manager_ex.ExtraCapabilityTooLong() - self.placement_client.create_reservation_provider( - host_details['hypervisor_hostname']) + self.placement_client.create_reservation_provider(hostname) - pool = nova.ReservationPool() - pool.add_computehost(self.freepool_name, - host_details['service_name']) + pool = nova.PlacementReservationPool() + freepool_id = pool.get_aggregate_id_from_name(self.freepool_name) + pool.add_computehost(freepool_id, host_details) host = None cantaddextracapability = [] @@ -380,10 +382,8 @@ def create_computehost(self, host_values): # We need to rollback # TODO(sbauza): Investigate use of Taskflow for atomic # transactions - pool.remove_computehost(self.freepool_name, - host_details['service_name']) - self.placement_client.delete_reservation_provider( - host_details['hypervisor_hostname']) + pool.remove_computehost(freepool_id, host_details) + self.placement_client.delete_reservation_provider(hostname) raise e for key in extra_capabilities: values = {'computehost_id': host['id'], @@ -398,6 +398,36 @@ def create_computehost(self, host_values): raise manager_ex.CantAddExtraCapability( keys=cantaddextracapability, host=host['id']) + + # Check for any custom resource classes + rp = self.placement_client.get_resource_provider(hostname) + if rp is None: + raise manager_ex.ResourceProviderNotFound(host=hostname) + + inventories = self.placement_client.get_inventory(rp['uuid']) + for rc, inventory in inventories['inventories'].items(): + reserved = int(inventory['reserved']) + # Hack for when ironic nodes are not currently available + if reserved == 1: + reserved = 0 + cr = { + 'computehost_id': host['id'], + 'resource_class': rc, + 'allocation_ratio': inventory['allocation_ratio'], + 'total': inventory['total'], + 'reserved': reserved, + 'max_unit': inventory['max_unit'], + 'min_unit': inventory['min_unit'], + } + db_api.host_custom_resource_create(cr) + + traits = self.placement_client.get_traits(rp['uuid']) + for trait in traits['traits']: + db_api.host_trait_create({ + 'computehost_id': host['id'], + 'trait': trait, + }) + return self.get_computehost(host['id']) def is_updatable_extra_capability(self, capability, property_name): @@ -482,16 +512,17 @@ def delete_computehost(self, host_id): ) inventory = nova.NovaInventory() - servers = inventory.get_servers_per_host( - host['hypervisor_hostname']) - if servers: - raise manager_ex.HostHavingServers( - host=host['hypervisor_hostname'], servers=servers) + # TODO(johng): hack to allow remove host! + #servers = inventory.get_servers_per_host( + # host['hypervisor_hostname']) + #if servers: + # raise manager_ex.HostHavingServers( + # host=host['hypervisor_hostname'], servers=servers) try: - pool = nova.ReservationPool() - pool.remove_computehost(self.freepool_name, - host['service_name']) + pool = nova.PlacementReservationPool() + freepool_id = pool.get_aggregate_id_from_name(self.freepool_name) + pool.remove_computehost(freepool_id, host) self.placement_client.delete_reservation_provider( host['hypervisor_hostname']) # NOTE(sbauza): Extracapabilities will be destroyed thanks to diff --git a/blazar/utils/openstack/nova.py b/blazar/utils/openstack/nova.py index 26803b0af..530a81b76 100644 --- a/blazar/utils/openstack/nova.py +++ b/blazar/utils/openstack/nova.py @@ -26,6 +26,7 @@ from blazar.manager import exceptions as manager_exceptions from blazar.plugins import oshosts from blazar.utils.openstack import base +from blazar.utils.openstack import placement nova_opts = [ @@ -357,9 +358,11 @@ def add_computehost(self, pool, hosts, stay_in=False): added_hosts = [] removed_hosts = [] agg = self.get_aggregate_from_name_or_id(pool) + # TODO get node list from placement? try: freepool_agg = self.get(self.freepool_name) + # TODO get node list from placement? except manager_exceptions.AggregateNotFound: raise manager_exceptions.NoFreePool() @@ -385,12 +388,13 @@ def add_computehost(self, pool, hosts, stay_in=False): # # NOTE(priteau): Preemptibles should not be used with # instance reservation yet. - self.terminate_preemptibles(host) + # TODO!! self.terminate_preemptibles(host) LOG.info("adding host '%(host)s' to aggregate %(id)s", {'host': host, 'id': agg.id}) try: self.nova.aggregates.add_host(agg.id, host) + # TODO: do placement instead added_hosts.append(host) except nova_exception.NotFound: raise manager_exceptions.HostNotFound(host=host) @@ -566,3 +570,58 @@ def get_servers_per_host(self, host): # a list of hosts without 'servers' attribute if no servers # are running on that host return None + + +class PlacementReservationPool(object): + def __init__(self): + self.old_pool = ReservationPool() + # used to manage placement aggregates + self.placement_client = placement.BlazarPlacementClient() + + @property + def nova(self): + # 2.41 is the first microversion where we get the aggregate uuid + # and it was available in Ocata + nova = BlazarNovaClient( + version="2.41", + username=CONF.os_admin_username, + password=CONF.os_admin_password, + user_domain_name=CONF.os_admin_user_domain_name, + project_name=CONF.os_admin_project_name, + project_domain_name=CONF.os_admin_project_domain_name) + return nova + + def get_aggregate_id_from_name(self, name): + all_aggregates = self.nova.aggregates.list() + # TODO(johngarbutt): this is horrible, but the only API way + for agg in all_aggregates: + if name == agg.name: + return agg.id + raise manager_exceptions.AggregateNotFound(pool=name) + + def create(self, name=None, az=None, metadata=None): + return self.old_pool.create(name, az, metadata) + + def delete(self, nova_aggregate_id): + # TODO: remove all hosts in placement? its expensive! + return self.old_pool.delete(nova_aggregate_id, force=True) + + def add_computehost(self, nova_aggregate_id, computehost): + rp = self.placement_client.get_resource_provider( + computehost["hypervisor_hostname"]) + + nova_agg = self.nova.aggregates.get(nova_aggregate_id) + aggregate_uuid = nova_agg.uuid + + self.placement_client.add_rp_to_aggregate( + aggregate_uuid, rp["uuid"]) + + def remove_computehost(self, nova_aggregate_id, computehost): + rp = self.placement_client.get_resource_provider( + computehost["hypervisor_hostname"]) + + nova_agg = self.nova.aggregates.get(nova_aggregate_id) + aggregate_uuid = nova_agg.uuid + + self.placement_client.remove_rp_from_aggregate( + aggregate_uuid, rp["uuid"]) diff --git a/blazar/utils/openstack/placement.py b/blazar/utils/openstack/placement.py index 8a3b217d7..32fb8545d 100644 --- a/blazar/utils/openstack/placement.py +++ b/blazar/utils/openstack/placement.py @@ -319,6 +319,17 @@ def get_inventory(self, rp_uuid): return resp.json() raise exceptions.ResourceProviderNotFound(resource_provider=rp_uuid) + def get_traits(self, rp_uuid): + """Calls the placement API to get resource inventory information. + + :param rp_uuid: UUID of the resource provider to get + """ + url = '/resource_providers/%s/traits' % rp_uuid + resp = self.get(url) + if resp: + return resp.json() + raise exceptions.ResourceProviderNotFound(resource_provider=rp_uuid) + @retrying.retry(stop_max_attempt_number=5, retry_on_exception=lambda e: isinstance( e, exceptions.InventoryConflict)) @@ -444,3 +455,44 @@ def delete_reservation_inventory(self, host_name, reserv_uuid): LOG.info("Resource class %s doesn't exist or there is no " "inventory for that resource class on resource provider " "%s. Skipped the deletion", rc_name, rp_name) + + def get_rp_aggregates(self, rp_uuid): + """Call placement to get resource provider aggregates information. + + :param rp_uuid: UUID of the resource provider to get + """ + url = '/resource_providers/%s/aggregates' % rp_uuid + resp = self.get(url) + if resp: + return resp.json() + raise exceptions.ResourceProviderNotFound(resource_provider=rp_uuid) + + def _put_rp_aggregates(self, rp_uuid, aggregate_info): + """Call placement to get resource provider aggregates information. + + :param rp_uuid: UUID of the resource provider to get + """ + url = '/resource_providers/%s/aggregates' % rp_uuid + resp = self.put(url, aggregate_info) + # TODO(johngarbutt): should we retry on confict? + resp.raise_for_status() + return resp.json() + + def add_rp_to_aggregate(self, aggregate_uuid, rp_uuid): + aggregate_info = self.get_rp_aggregates(rp_uuid) + if aggregate_uuid not in aggregate_info["aggregates"]: + aggregate_info["aggregates"].append(aggregate_uuid) + # NOTE: this includes resource_provider_generation + # to ensure an atomic update + return self._put_rp_aggregates(rp_uuid, aggregate_info) + + def remove_rp_from_aggregate(self, aggregate_uuid, rp_uuid): + aggregate_info = self.get_rp_aggregates(rp_uuid) + current_aggregates = list(aggregate_info["aggregates"]) + if aggregate_uuid in current_aggregates: + new_aggregates = [agg for agg in current_aggregates + if agg != aggregate_uuid] + aggregate_info["aggregates"] = new_aggregates + # NOTE: this includes resource_provider_generation + # to ensure an atomic update + return self._put_rp_aggregates(rp_uuid, aggregate_info) diff --git a/lower-constraints.txt b/lower-constraints.txt new file mode 100644 index 000000000..e69de29bb