# Copyright 2012 Cloudbase Solutions Srl # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import re import netaddr from oslo_log import log as oslo_logging from cloudbaseinit import exception from cloudbaseinit.models import network as network_model from cloudbaseinit.osutils import factory as osutils_factory from cloudbaseinit.plugins.common import base as plugin_base from cloudbaseinit.utils import network LOG = oslo_logging.getLogger(__name__) # Mandatory network details are marked with True. And # if the key is a tuple, then at least one field must exist. # MODIFIED: Made MAC optional when name is provided NET_REQUIRE = { ("name", "mac"): False, # Changed from True - now either name OR mac is sufficient "name": True, # Added - name is now mandatory ("address", "address6"): True, ("netmask", "netmask6"): True, "broadcast": False, # currently not used ("gateway", "gateway6"): False, "dnsnameservers": False } BOND_FORMAT_STR = "bond_%s" def _name2idx(name): """Get the position of a network interface by its name.""" match = re.search(r"eth(\d+)", name, re.I) if not match: raise exception.CloudbaseInitException( "invalid NetworkDetails name {!r}" .format(name) ) return int(match.group(1)) def _preprocess_nics(network_details, network_adapters): """Check NICs and fill missing data if possible. MODIFIED: Prioritize name-based matching over MAC-based matching. """ # Initial checks. if not network_adapters: raise exception.CloudbaseInitException( "no network adapters available") # Sort VM adapters by name (assuming that those # from the context are in correct order). # Do this for a better matching by order # if hardware address is missing. network_adapters = sorted(network_adapters, key=lambda arg: arg[0]) refined_network_details = [] # store here processed interfaces # MODIFIED: Create lookup dictionaries for efficient searching adapters_by_name = {adapter[0]: adapter for adapter in network_adapters} adapters_by_mac = {adapter[1].lower(): adapter for adapter in network_adapters if adapter[1]} # Check and update every NetworkDetails object. total = len(network_adapters) for nic in network_details: if not isinstance(nic, network_model.NetworkDetails): raise exception.CloudbaseInitException( "invalid NetworkDetails object {!r}" .format(type(nic)) ) # MODIFIED: Check requirements with new logic final_status = True for fields, status in NET_REQUIRE.items(): if not status: continue # skip 'not required' entries if not isinstance(fields, tuple): fields = (fields,) final_status = any([getattr(nic, field) for field in fields]) if not final_status: break address, netmask = nic.address, nic.netmask if final_status: # Additional check for info version. if not (address and netmask): final_status = nic.address6 and nic.netmask6 if final_status: address = address or network.address6_to_4_truncate( nic.address6) netmask = netmask or network.netmask6_to_4_truncate( nic.netmask6) if not final_status: LOG.error("Incomplete NetworkDetails object %s", nic) continue # MODIFIED: Enhanced MAC address resolution with name priority mac = nic.mac adapter_info = None # Strategy 1: Try to match by adapter name first (NEW PRIORITY) if nic.name and nic.name in adapters_by_name: adapter_info = adapters_by_name[nic.name] mac = adapter_info[1] # Get MAC from the adapter LOG.info("Matched network adapter by name: %s -> MAC: %s", nic.name, mac) # Strategy 2: Try to match by MAC address (fallback) elif nic.mac and nic.mac.lower() in adapters_by_mac: adapter_info = adapters_by_mac[nic.mac.lower()] LOG.info("Matched network adapter by MAC: %s -> Name: %s", nic.mac, adapter_info[0]) # Strategy 3: Try to match by interface index (fallback) elif nic.name: try: idx = _name2idx(nic.name) if idx < total: adapter_info = network_adapters[idx] mac = adapter_info[1] LOG.info("Matched network adapter by index %d: %s -> MAC: %s", idx, nic.name, mac) except exception.CloudbaseInitException: # Invalid name format, continue with other strategies pass # Strategy 4: Original fallback behavior for backward compatibility if not adapter_info and not mac: # By name... macs = [adapter[1] for adapter in network_adapters if adapter[0] == nic.name] mac = macs[0] if macs else None # ...or by order. if not mac and nic.name: try: idx = _name2idx(nic.name) if idx < total: mac = network_adapters[idx][1] except exception.CloudbaseInitException: pass # MODIFIED: Better error handling with available adapter info if not mac: available_names = [adapter[0] for adapter in network_adapters] available_macs = [adapter[1] for adapter in network_adapters if adapter[1]] raise exception.CloudbaseInitException( "Cannot find network adapter for name: '{}', MAC: '{}'. " "Available adapter names: {}. Available MACs: {}".format( nic.name, nic.mac, available_names, available_macs)) nic = network_model.NetworkDetails( nic.name, mac, address, nic.address6, netmask, nic.netmask6, nic.broadcast, nic.gateway, nic.gateway6, nic.dnsnameservers ) refined_network_details.append(nic) return refined_network_details def _set_dns_search_domains_registry(search_domains): """Set DNS search domains via Windows registry.""" try: import winreg # Open the TCP/IP parameters registry key key_path = r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters" with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key_path, 0, winreg.KEY_SET_VALUE) as key: # Set the SearchList value search_list = ','.join(search_domains) winreg.SetValueEx(key, "SearchList", 0, winreg.REG_SZ, search_list) LOG.info("Set DNS SearchList in registry: %s", search_list) # Try to set Domain if not already set, but don't fail if permission denied try: # First try to query if Domain already exists (need read permission) with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key_path, 0, winreg.KEY_READ) as read_key: try: winreg.QueryValueEx(read_key, "Domain") LOG.debug("Domain registry value already exists, not modifying") except FileNotFoundError: # Domain not set, try to set it with the first search domain if search_domains: try: winreg.SetValueEx(key, "Domain", 0, winreg.REG_SZ, search_domains[0]) LOG.info("Set DNS Domain in registry: %s", search_domains[0]) except PermissionError: LOG.debug("Permission denied setting Domain registry value, but SearchList was set successfully") except PermissionError: LOG.debug("Permission denied reading Domain registry value, but SearchList was set successfully") except Exception as e: LOG.error("Failed to set DNS search domains via registry: %s", e) raise class NetworkConfigPlugin(plugin_base.BasePlugin): def _process_network_details(self, network_details): osutils = osutils_factory.get_os_utils() # Check and save NICs by MAC. network_adapters = osutils.get_network_adapters() network_details = _preprocess_nics(network_details, network_adapters) # MODIFIED: Create lookup by both MAC and name for flexibility macnics = {} namenics = {} for nic in network_details: # Assuming that the MAC address is unique. if nic.mac: macnics[nic.mac] = nic if nic.name: namenics[nic.name] = nic # MODIFIED: Try configuring adapters with name priority reboot_required = False configured = False for adapter_name, adapter_mac in network_adapters: nic = None # Strategy 1: Look for NIC config by adapter name first if adapter_name in namenics: nic = namenics[adapter_name] LOG.info("Found NIC config for adapter name: %s", adapter_name) # Strategy 2: Fallback to MAC-based lookup elif adapter_mac and adapter_mac in macnics: nic = macnics[adapter_mac] LOG.info("Found NIC config for adapter MAC: %s (name: %s)", adapter_mac, adapter_name) if not nic: LOG.debug("No network configuration found for adapter: %s (MAC: %s)", adapter_name, adapter_mac) continue # Remove from lookup to track unused configs if adapter_mac and adapter_mac in macnics: macnics.pop(adapter_mac, None) if adapter_name in namenics: namenics.pop(adapter_name, None) LOG.info("Configuring network adapter: %s", adapter_name) # In v6 only case, nic.address and nic.netmask could be unset if nic.address and nic.netmask: reboot = osutils.set_static_network_config( adapter_name, # Use actual adapter name, not from config nic.address, nic.netmask, nic.gateway, nic.dnsnameservers or [] ) reboot_required = reboot or reboot_required # Set v6 info too if available. if nic.address6 and nic.netmask6: reboot = osutils.set_static_network_config( adapter_name, # Use actual adapter name, not from config nic.address6, nic.netmask6, nic.gateway6, [] ) reboot_required = reboot or reboot_required configured = True # MODIFIED: Better reporting of unused configurations for mac in macnics: LOG.warning("Network configuration not used for MAC: %s (config name: %s)", mac, macnics[mac].name) for name in namenics: LOG.warning("Network configuration not used for name: %s", name) if not configured: LOG.error("No adapters were configured") return plugin_base.PLUGIN_EXECUTION_DONE, reboot_required @staticmethod def _process_link_common(osutils, link): if link.mtu: LOG.debug( "Setting MTU on network adapter \"%(name)s\": %(mtu)s", {"name": link.name, "mtu": link.mtu}) osutils.set_network_adapter_mtu(link.name, link.mtu) LOG.debug( "Enable network adapter \"%(name)s\": %(enabled)s", {"name": link.name, "enabled": link.enabled}) osutils.enable_network_adapter(link.name, link.enabled) @staticmethod def _process_physical_links(osutils, network_details): physical_links = [ link for link in network_details.links if link.type == network_model.LINK_TYPE_PHYSICAL] for link in physical_links: # MODIFIED: Enhanced name-based matching for v2 format adapter_name = None available_adapters = osutils.get_network_adapters() adapter_names = [adapter[0] for adapter in available_adapters] # Strategy 1: Handle v2 format match criteria if hasattr(link, 'match') and link.match: match_criteria = link.match # Try name matching from v2 match criteria if 'name' in match_criteria: match_name = match_criteria['name'] if match_name in adapter_names: adapter_name = match_name LOG.info("Found adapter by v2 match name: %s", match_name) else: LOG.warning("Adapter name '%s' from v2 match not found. Available: %s", match_name, adapter_names) # Try macaddress matching from v2 match criteria elif 'macaddress' in match_criteria and not adapter_name: try: adapter_name = osutils.get_network_adapter_name_by_mac_address( match_criteria['macaddress']) LOG.info("Found adapter by v2 match MAC: %s -> %s", match_criteria['macaddress'], adapter_name) except Exception as e: LOG.warning("Could not find adapter by MAC '%s': %s", match_criteria['macaddress'], e) # Strategy 2: Direct name matching (v1 style or direct link name) if not adapter_name and link.name in adapter_names: adapter_name = link.name LOG.info("Found adapter by direct name match: %s", link.name) # Strategy 3: MAC-based lookup fallback if not adapter_name and hasattr(link, 'mac_address') and link.mac_address: try: adapter_name = osutils.get_network_adapter_name_by_mac_address( link.mac_address) LOG.info("Found adapter by MAC lookup: %s -> %s", link.mac_address, adapter_name) except Exception as e: LOG.warning("Could not find adapter for link %s (MAC: %s): %s", link.name, link.mac_address, e) if not adapter_name: LOG.error("Could not find network adapter for link: %s", link.name) if hasattr(link, 'match'): LOG.error("Match criteria: %s", link.match) LOG.error("Available adapters: %s", adapter_names) continue # Rename adapter if needed (only if the desired name is different and available) if adapter_name != link.name and link.name not in adapter_names: LOG.info( "Renaming network adapter \"%(old_name)s\" to " "\"%(new_name)s\"", {"old_name": adapter_name, "new_name": link.name}) try: osutils.rename_network_adapter(adapter_name, link.name) except Exception as e: LOG.warning("Failed to rename adapter %s to %s: %s", adapter_name, link.name, e) NetworkConfigPlugin._process_link_common(osutils, link) @staticmethod def _process_bond_links(osutils, network_details): bond_links = [ link for link in network_details.links if link.type == network_model.LINK_TYPE_BOND] for link in bond_links: bond_name = BOND_FORMAT_STR % link.id primary_nic_vlan_id = None LOG.info("Creating network team: %s", bond_name) osutils.create_network_team( bond_name, link.bond.type, link.bond.lb_algorithm, link.bond.members, link.mac_address, link.name, primary_nic_vlan_id, link.bond.lacp_rate) NetworkConfigPlugin._process_link_common(osutils, link) @staticmethod def _process_vlan_links(osutils, network_details): vlan_links = [ link for link in network_details.links if link.type == network_model.LINK_TYPE_VLAN] for link in vlan_links: bond_name = BOND_FORMAT_STR % link.vlan_link LOG.info( "Creating bond network adapter \"%(nic_name)s\" on team " "\"%(bond_name)s\" with VLAN: %(vlan_id)s", {"nic_name": link.name, "bond_name": bond_name, "vlan_id": link.vlan_id}) osutils.add_network_team_nic(bond_name, link.name, link.vlan_id) NetworkConfigPlugin._process_link_common(osutils, link) @staticmethod def _get_default_dns_nameservers(network_details): ipv4_nameservers = [] ipv6_nameservers = [] for s in network_details.services: if isinstance(s, network_model.NameServerService): for nameserver in s.addresses: if netaddr.valid_ipv6(nameserver): ipv6_nameservers.append(nameserver) else: ipv4_nameservers.append(nameserver) return (ipv4_nameservers, ipv6_nameservers) @staticmethod def _process_networks(osutils, network_details): reboot_required = False ipv4_ns, ipv6_ns = NetworkConfigPlugin._get_default_dns_nameservers( network_details) # ADDED: Collect search domains from all networks (preserving order) all_search_domains = [] # Changed from set() to list to preserve order for net in network_details.networks: ip_address, prefix_len = net.address_cidr.split("/") gateway = None default_gw_route = [ r for r in net.routes if netaddr.IPNetwork(r.network_cidr).prefixlen == 0] if default_gw_route: gateway = default_gw_route[0].gateway nameservers = net.dns_nameservers if not nameservers: if netaddr.valid_ipv6(ip_address): nameservers = ipv6_ns else: nameservers = ipv4_ns # ADDED: Extract search domains from this network (preserving order) search_domains = [] if hasattr(net, 'dns_search_domains') and net.dns_search_domains: search_domains = net.dns_search_domains # Add domains in order, avoiding duplicates but preserving order for domain in search_domains: if domain not in all_search_domains: all_search_domains.append(domain) LOG.info("Found search domains for network %s (in order): %s", net.link, search_domains) elif hasattr(net, 'search_domains') and net.search_domains: search_domains = net.search_domains # Add domains in order, avoiding duplicates but preserving order for domain in search_domains: if domain not in all_search_domains: all_search_domains.append(domain) LOG.info("Found search domains for network %s (in order): %s", net.link, search_domains) LOG.info( "Setting static IP configuration on network adapter " "\"%(name)s\". IP: %(ip)s, prefix length: %(prefix_len)s, " "gateway: %(gateway)s, dns: %(dns)s", {"name": net.link, "ip": ip_address, "prefix_len": prefix_len, "gateway": gateway, "dns": nameservers}) reboot = osutils.set_static_network_config( net.link, ip_address, prefix_len, gateway, nameservers) reboot_required = reboot or reboot_required # ADDED: Set DNS search domains globally if any were found (preserving order) if all_search_domains: LOG.info("Setting global DNS search domains (in order): %s", all_search_domains) try: _set_dns_search_domains_registry(all_search_domains) LOG.info("Successfully set DNS search domains via registry") except Exception as e: LOG.warning("Failed to set DNS search domains: %s", e) return reboot_required def _set_dns_search_domains_registry(self, search_domains): """Set DNS search domains via Windows registry.""" try: import winreg # Open the TCP/IP parameters registry key key_path = r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters" with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key_path, 0, winreg.KEY_SET_VALUE) as key: # Set the SearchList value search_list = ','.join(search_domains) winreg.SetValueEx(key, "SearchList", 0, winreg.REG_SZ, search_list) LOG.info("Set DNS SearchList in registry: %s", search_list) # Also set Domain if not already set try: winreg.QueryValueEx(key, "Domain") except FileNotFoundError: # Domain not set, use the first search domain if search_domains: winreg.SetValueEx(key, "Domain", 0, winreg.REG_SZ, search_domains[0]) LOG.info("Set DNS Domain in registry: %s", search_domains[0]) except Exception as e: LOG.error("Failed to set DNS search domains via registry: %s", e) raise @staticmethod def _process_network_details_v2(network_details): osutils = osutils_factory.get_os_utils() NetworkConfigPlugin._process_physical_links( osutils, network_details) NetworkConfigPlugin._process_bond_links(osutils, network_details) NetworkConfigPlugin._process_vlan_links(osutils, network_details) reboot_required = NetworkConfigPlugin._process_networks( osutils, network_details) return plugin_base.PLUGIN_EXECUTION_DONE, reboot_required def _process_v2_search_domains(self, service): """Extract and set search domains from v2 network configuration.""" try: # Try multiple approaches to get the network configuration network_config = None # Method 1: Try to get network-config from cache with proper path if hasattr(service, '_get_cache_data'): try: network_config = service._get_cache_data('network-config') LOG.debug("Got network config from cache path 'network-config'") except: try: network_config = service._get_cache_data('network_config') LOG.debug("Got network config from cache path 'network_config'") except: pass # Method 2: Try the get_network_config method if not network_config and hasattr(service, 'get_network_config'): try: network_config = service.get_network_config() LOG.debug("Got network config from get_network_config method") except: pass # Method 3: Try to access the metadata directly if not network_config and hasattr(service, 'get_content'): try: network_config = service.get_content('network-config') LOG.debug("Got network config from get_content method") except: try: network_config = service.get_content('network_config') LOG.debug("Got network config from get_content method (alt)") except: pass # Method 4: Try accessing internal cache/metadata if not network_config: try: if hasattr(service, '_metadata') and service._metadata: metadata = service._metadata if 'network-config' in metadata: network_config = metadata['network-config'] elif 'network_config' in metadata: network_config = metadata['network_config'] LOG.debug("Got network config from internal metadata") except: pass if not network_config: LOG.debug("Could not retrieve network configuration from service") return False # Handle bytes objects by decoding them first if isinstance(network_config, bytes): try: network_config = network_config.decode('utf-8') LOG.debug("Decoded network config from bytes to string") except Exception as e: LOG.warning("Failed to decode network config bytes: %s", e) return False # Parse the configuration if it's a string if isinstance(network_config, str): import yaml try: network_config = yaml.safe_load(network_config) LOG.debug("Parsed network config from YAML string") except: import json try: network_config = json.loads(network_config) LOG.debug("Parsed network config from JSON string") except: LOG.warning("Could not parse network config string") return False if not isinstance(network_config, dict): LOG.debug("Network config is not a dictionary: %s", type(network_config)) return False LOG.debug("Network config structure: %s", list(network_config.keys())) # Extract search domains from v2 format search_domains = [] # Changed from set() to list to preserve order # Check if it has the 'network' wrapper (your format) if 'network' in network_config: config_data = network_config['network'] LOG.debug("Found 'network' wrapper in config") else: config_data = network_config LOG.debug("Using config data directly") if config_data.get('version') != 2: LOG.debug("Not a v2 network config, version: %s", config_data.get('version')) return False # Look for search domains in ethernets section ethernets = config_data.get('ethernets', {}) LOG.debug("Found ethernets section with %d interfaces", len(ethernets)) for interface_name, interface_config in ethernets.items(): LOG.debug("Processing interface: %s", interface_name) nameservers = interface_config.get('nameservers', {}) if 'search' in nameservers: search_list = nameservers['search'] if isinstance(search_list, list): # Add domains in order, avoiding duplicates but preserving order for domain in search_list: if domain not in search_domains: search_domains.append(domain) LOG.info("Found search domains in interface %s (in order): %s", interface_name, search_list) else: LOG.debug("Search list is not a list: %s", type(search_list)) else: LOG.debug("No search domains found in interface %s nameservers", interface_name) # Set search domains if any found (preserving order) if search_domains: LOG.info("Setting DNS search domains from v2 config (in order): %s", search_domains) try: _set_dns_search_domains_registry(search_domains) return True except Exception as e: LOG.warning("Failed to set search domains from v2 config: %s", e) else: LOG.debug("No search domains found in v2 configuration") except Exception as e: LOG.warning("Error processing v2 search domains: %s", e) import traceback LOG.debug("Full traceback: %s", traceback.format_exc()) return False def execute(self, service, shared_data): network_details = service.get_network_details_v2() if network_details: result, reboot_required = self._process_network_details_v2(network_details) # ADDED: Process search domains for v2 format try: search_domain_processed = self._process_v2_search_domains(service) if search_domain_processed: LOG.info("Successfully processed v2 search domains") except Exception as e: LOG.warning("Could not process v2 search domains: %s", e) return result, reboot_required network_details = service.get_network_details() if network_details: result, reboot_required = self._process_network_details(network_details) # ADDED: Process nameserver configurations for v1 format try: nameserver_reboot = self._process_v1_nameserver_config(service) reboot_required = reboot_required or nameserver_reboot except Exception as e: LOG.warning("Could not process nameserver configuration: %s", e) return result, reboot_required return plugin_base.PLUGIN_EXECUTION_DONE, False def _process_v1_nameserver_config(self, service): """Process nameserver configurations from network config v1 format.""" osutils = osutils_factory.get_os_utils() try: # Try to get raw network configuration if hasattr(service, '_get_cache_data'): cache_data = service._get_cache_data() if cache_data and 'network-config' in cache_data: network_config = cache_data['network-config'] elif cache_data and 'network_config' in cache_data: network_config = cache_data['network_config'] else: return False else: # Alternative approach for different service types try: network_config = service.get_network_config() except: return False if not network_config: return False # Parse the configuration if it's a string (YAML/JSON) if isinstance(network_config, str): import yaml try: network_config = yaml.safe_load(network_config) except: import json try: network_config = json.loads(network_config) except: return False if not isinstance(network_config, dict) or 'config' not in network_config: return False # Find nameserver configurations nameserver_configs = [ entry for entry in network_config.get('config', []) if isinstance(entry, dict) and entry.get('type') == 'nameserver' ] if not nameserver_configs: return False # Process each nameserver configuration for ns_config in nameserver_configs: search_domains = ns_config.get('search', []) if search_domains: LOG.info("Setting DNS search domains: %s", search_domains) # Try to set DNS search domains using Windows-specific methods try: # Method 1: Use netsh to set DNS search list import subprocess search_list = ','.join(search_domains) cmd = ['netsh', 'interface', 'ip', 'set', 'dns', 'name=*', 'static', 'none', 'primary'] # This is a placeholder - the actual implementation would need # to use the osutils methods or direct Windows API calls # Method 2: Try using osutils if it has DNS search methods if hasattr(osutils, 'set_dns_search_domains'): osutils.set_dns_search_domains(search_domains) LOG.info("Successfully set DNS search domains via osutils") return True elif hasattr(osutils, 'set_dns_client_search_list'): osutils.set_dns_client_search_list(search_domains) LOG.info("Successfully set DNS search domains via client search list") return True else: # Method 3: Set via registry (Windows-specific approach) _set_dns_search_domains_registry(search_domains) LOG.info("Successfully set DNS search domains via registry") return True except Exception as e: LOG.warning("Failed to set DNS search domains: %s", e) except Exception as e: LOG.warning("Error processing nameserver configuration: %s", e) return False