/* Copyright (c) 2017 OpenStack Foundation. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package proxy import ( "bytes" "fmt" "net" "strconv" "strings" "sync" "sync/atomic" "time" "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" informersV1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/util/async" utilexec "k8s.io/utils/exec" "git.openstack.org/openstack/stackube/pkg/openstack" "git.openstack.org/openstack/stackube/pkg/util" "github.com/golang/glog" ) const ( defaultResyncPeriod = 15 * time.Minute minSyncPeriod = 5 * time.Second syncPeriod = 30 * time.Second burstSyncs = 2 ) // Proxier is an iptables based proxy for connections between a localhost:port // and services that provide the actual backends in each network. type Proxier struct { clusterDNS string kubeClientset *kubernetes.Clientset osClient openstack.Interface iptables iptablesInterface factory informers.SharedInformerFactory namespaceInformer informersV1.NamespaceInformer serviceInformer informersV1.ServiceInformer endpointInformer informersV1.EndpointsInformer // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since iptables was synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. endpointsChanges endpointsChangeMap serviceChanges serviceChangeMap namespaceChanges namespaceChangeMap mu sync.Mutex // protects the following fields initialized int32 // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating iptables // with some partial data after stackube-proxy restart. endpointsSynced bool servicesSynced bool namespaceSynced bool serviceMap proxyServiceMap endpointsMap proxyEndpointsMap namespaceMap map[string]*namespaceInfo // service grouping by namespace. serviceNSMap map[string]proxyServiceMap // governs calls to syncProxyRules syncRunner *async.BoundedFrequencyRunner } // NewProxier creates a new Proxier. func NewProxier(kubeConfig, openstackConfig string) (*Proxier, error) { // Create OpenStack client from config file. osClient, err := openstack.NewClient(openstackConfig, kubeConfig) if err != nil { return nil, fmt.Errorf("could't initialize openstack client: %v", err) } // Create kubernetes client config. Use kubeconfig if given, otherwise assume in-cluster. config, err := util.NewClusterConfig(kubeConfig) if err != nil { return nil, fmt.Errorf("failed to build kubeconfig: %v", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("failed to build clientset: %v", err) } clusterDNS, err := getClusterDNS(clientset) if err != nil { return nil, fmt.Errorf("failed to get cluster DNS: %v", err) } factory := informers.NewSharedInformerFactory(clientset, defaultResyncPeriod) execer := utilexec.New() proxier := &Proxier{ kubeClientset: clientset, osClient: osClient, iptables: NewIptables(execer), factory: factory, clusterDNS: clusterDNS, endpointsChanges: newEndpointsChangeMap(""), serviceChanges: newServiceChangeMap(), namespaceChanges: newNamespaceChangeMap(), serviceMap: make(proxyServiceMap), endpointsMap: make(proxyEndpointsMap), namespaceMap: make(map[string]*namespaceInfo), serviceNSMap: make(map[string]proxyServiceMap), } proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) return proxier, nil } func (p *Proxier) setInitialized(value bool) { var initialized int32 if value { initialized = 1 } atomic.StoreInt32(&p.initialized, initialized) } func (p *Proxier) isInitialized() bool { return atomic.LoadInt32(&p.initialized) > 0 } func (p *Proxier) onServiceAdded(obj interface{}) { service, ok := obj.(*v1.Service) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if p.serviceChanges.update(&namespacedName, nil, service) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onServiceUpdated(old, new interface{}) { oldService, ok := old.(*v1.Service) if !ok { glog.Errorf("Unexpected object type: %v", old) return } service, ok := new.(*v1.Service) if !ok { glog.Errorf("Unexpected object type: %v", new) return } namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if p.serviceChanges.update(&namespacedName, oldService, service) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onServiceDeleted(obj interface{}) { service, ok := obj.(*v1.Service) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } if service, ok = tombstone.Obj.(*v1.Service); !ok { glog.Errorf("Unexpected object type: %v", obj) return } } namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if p.serviceChanges.update(&namespacedName, service, nil) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) getRouterForNamespace(namespace string) (string, error) { // Only support one network and network's name is same with namespace. // TODO: make it general after multi-network is supported. networkName := util.BuildNetworkName(namespace, namespace) network, err := p.osClient.GetNetworkByName(networkName) if err != nil { glog.Errorf("Get network by name %q failed: %v", networkName, err) return "", err } ports, err := p.osClient.ListPorts(network.Uid, "network:router_interface") if err != nil { glog.Errorf("Get port list for network %q failed: %v", networkName, err) return "", err } if len(ports) == 0 { glog.Errorf("Get zero router interface for network %q", networkName) return "", fmt.Errorf("no router interface found") } return ports[0].DeviceID, nil } func (p *Proxier) onEndpointsAdded(obj interface{}) { endpoints, ok := obj.(*v1.Endpoints) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if p.endpointsChanges.update(&namespacedName, nil, endpoints) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onEndpointUpdated(old, new interface{}) { oldEndpoints, ok := old.(*v1.Endpoints) if !ok { glog.Errorf("Unexpected object type: %v", old) return } endpoints, ok := new.(*v1.Endpoints) if !ok { glog.Errorf("Unexpected object type: %v", new) return } namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if p.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onEndpointDeleted(obj interface{}) { endpoints, ok := obj.(*v1.Endpoints) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } if endpoints, ok = tombstone.Obj.(*v1.Endpoints); !ok { glog.Errorf("Unexpected object type: %v", obj) return } } namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if p.endpointsChanges.update(&namespacedName, endpoints, nil) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onNamespaceAdded(obj interface{}) { namespace, ok := obj.(*v1.Namespace) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } if p.namespaceChanges.update(namespace.Name, nil, namespace) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onNamespaceUpdated(old, new interface{}) { oldNamespace, ok := old.(*v1.Namespace) if !ok { glog.Errorf("Unexpected object type: %v", old) return } namespace, ok := new.(*v1.Namespace) if !ok { glog.Errorf("Unexpected object type: %v", new) return } if p.namespaceChanges.update(oldNamespace.Name, oldNamespace, namespace) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onNamespaceDeleted(obj interface{}) { namespace, ok := obj.(*v1.Namespace) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } if namespace, ok = tombstone.Obj.(*v1.Namespace); !ok { glog.Errorf("Unexpected object type: %v", obj) return } } if p.namespaceChanges.update(namespace.Name, namespace, nil) && p.isInitialized() { p.syncRunner.Run() } } // RegisterInformers registers informers which informer on namespaces,services and endpoints change. func (p *Proxier) RegisterInformers() { p.namespaceInformer = p.factory.Core().V1().Namespaces() p.namespaceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: p.onNamespaceAdded, UpdateFunc: p.onNamespaceUpdated, DeleteFunc: p.onNamespaceDeleted, }, time.Minute) p.serviceInformer = p.factory.Core().V1().Services() p.serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: p.onServiceAdded, UpdateFunc: p.onServiceUpdated, DeleteFunc: p.onServiceDeleted, }, time.Minute) p.endpointInformer = p.factory.Core().V1().Endpoints() p.endpointInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: p.onEndpointsAdded, UpdateFunc: p.onEndpointUpdated, DeleteFunc: p.onEndpointDeleted, }, time.Minute) } // StartNamespaceInformer starts namespace informer. func (p *Proxier) StartNamespaceInformer(stopCh <-chan struct{}) error { if !cache.WaitForCacheSync(stopCh, p.namespaceInformer.Informer().HasSynced) { return fmt.Errorf("failed to cache namespaces") } glog.Infof("Namespace informer cached.") // Update sync status. p.mu.Lock() p.namespaceSynced = true p.setInitialized(p.servicesSynced && p.endpointsSynced && p.namespaceSynced) p.mu.Unlock() // Sync unconditionally - this is called once per lifetime. p.syncProxyRules() return nil } // StartServiceInformer starts service informer. func (p *Proxier) StartServiceInformer(stopCh <-chan struct{}) error { if !cache.WaitForCacheSync(stopCh, p.serviceInformer.Informer().HasSynced) { return fmt.Errorf("failed to cache services") } glog.Infof("Services informer cached.") // Update sync status. p.mu.Lock() p.servicesSynced = true p.setInitialized(p.servicesSynced && p.endpointsSynced && p.namespaceSynced) p.mu.Unlock() // Sync unconditionally - this is called once per lifetime. p.syncProxyRules() return nil } // StartEndpointInformer starts endpoint informer. func (p *Proxier) StartEndpointInformer(stopCh <-chan struct{}) error { if !cache.WaitForCacheSync(stopCh, p.endpointInformer.Informer().HasSynced) { return fmt.Errorf("failed to cache endpoints") } glog.Infof("Endpoints informer cached.") p.mu.Lock() p.endpointsSynced = true p.setInitialized(p.servicesSynced && p.endpointsSynced && p.namespaceSynced) p.mu.Unlock() // Sync unconditionally - this is called once per lifetime. p.syncProxyRules() return nil } // StartInformerFactory starts informer factory. func (p *Proxier) StartInformerFactory(stopCh <-chan struct{}) error { p.factory.Start(stopCh) return nil } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (p *Proxier) SyncLoop() error { p.syncRunner.Loop(wait.NeverStop) return nil } func (p *Proxier) updateCaches() { // Update serviceMap. func() { p.serviceChanges.lock.Lock() defer p.serviceChanges.lock.Unlock() for _, change := range p.serviceChanges.items { existingPorts := p.serviceMap.merge(change.current) p.serviceMap.unmerge(change.previous, existingPorts) } p.serviceChanges.items = make(map[types.NamespacedName]*serviceChange) }() // Update services grouping by namespace. func() { for svc := range p.serviceMap { info := p.serviceMap[svc] if v, ok := p.serviceNSMap[svc.Namespace]; ok { v[svc] = info } else { p.serviceNSMap[svc.Namespace] = proxyServiceMap{svc: info} } } }() // Update endpointsMap. func() { p.endpointsChanges.lock.Lock() defer p.endpointsChanges.lock.Unlock() for _, change := range p.endpointsChanges.items { p.endpointsMap.unmerge(change.previous) p.endpointsMap.merge(change.current) } p.endpointsChanges.items = make(map[types.NamespacedName]*endpointsChange) }() // Update namespaceMap and get router for namespaces. func() { p.namespaceChanges.lock.Lock() defer p.namespaceChanges.lock.Unlock() for n, change := range p.namespaceChanges.items { if change.current == nil { delete(p.namespaceMap, n) } else { if _, ok := p.namespaceMap[n]; !ok { p.namespaceMap[n] = change.current } // get router for the namespace if p.namespaceMap[n].router == "" { router, err := p.getRouterForNamespace(n) if err != nil { glog.Warningf("Get router for namespace %q failed: %v. This may be caused by network not ready yet.", n, err) continue } p.namespaceMap[n].router = router } } } p.namespaceChanges.items = make(map[string]*namespaceChange) }() } func (p *Proxier) syncProxyRules() { p.mu.Lock() defer p.mu.Unlock() // don't sync rules until we've received services and endpoints if !p.servicesSynced || !p.endpointsSynced || !p.namespaceSynced { glog.V(2).Info("Not syncing iptables until services, endpoints and namespaces have been received from master") return } // update local caches. p.updateCaches() glog.V(3).Infof("Syncing iptables rules") // iptablesData contains the iptables rules for netns. iptablesData := bytes.NewBuffer(nil) // Sync iptables rules for services. for namespace := range p.serviceNSMap { iptablesData.Reset() // Step 1: get namespace info. nsInfo, ok := p.namespaceMap[namespace] if !ok { glog.Errorf("Namespace %q doesn't exist in caches", namespace) continue } glog.V(3).Infof("Syncing iptables for namespace %q: %v", namespace, nsInfo) // Step 2: try to get router again since router may be created late after namespaces. if nsInfo.router == "" { router, err := p.getRouterForNamespace(namespace) if err != nil { glog.Warningf("Get router for namespace %q failed: %v. This may be caused by network not ready yet.", namespace, err) continue } nsInfo.router = router } // Step 3: compose iptables chain. netns := getRouterNetns(nsInfo.router) // populates netns to iptables. p.iptables.setNetns(netns) if !p.iptables.netnsExist() { glog.V(3).Infof("Netns %q doesn't exist, omit the services in namespace %q", netns, namespace) continue } // ensure chain STACKUBE-PREROUTING created. err := p.iptables.ensureChain() if err != nil { glog.Errorf("EnsureChain %q in netns %q failed: %v", ChainSKPrerouting, netns, err) continue } // link STACKUBE-PREROUTING chain. err = p.iptables.ensureRule(opAddpendRule, ChainPrerouting, []string{ "-m", "comment", "--comment", "stackube service portals", "-j", ChainSKPrerouting, }) if err != nil { glog.Errorf("Link chain %q in netns %q failed: %v", ChainSKPrerouting, netns, err) continue } // Step 4: flush chain STACKUBE-PREROUTING. writeLine(iptablesData, []string{"*nat"}...) writeLine(iptablesData, []string{":" + ChainSKPrerouting, "-", "[0:0]"}...) writeLine(iptablesData, []string{opFlushChain, ChainSKPrerouting}...) writeLine(iptablesData, []string{"COMMIT"}...) // Step 5: compose rules for each services. glog.V(5).Infof("Syncing iptables for services %v", p.serviceNSMap[namespace]) writeLine(iptablesData, []string{"*nat"}...) for svcName, svcInfo := range p.serviceNSMap[namespace] { protocol := strings.ToLower(string(svcInfo.protocol)) svcNameString := svcInfo.serviceNameString // Step 5.1: check service type. // Only ClusterIP service is supported. We also handles clusterIP for other typed services, but note that: // - NodePort service is not supported since networks are L2 isolated. // - LoadBalancer service is handled in service controller. if svcInfo.serviceType != v1.ServiceTypeClusterIP { glog.V(3).Infof("Only service's clusterIP is handled here, omitting other fields of service %q (type=%q)", svcName.NamespacedName, svcInfo.serviceType) } // Step 5.2: check endpoints. // If the service has no endpoints then do nothing. if len(p.endpointsMap[svcName]) == 0 { glog.V(3).Infof("No endpoints found for service %q", svcName.NamespacedName) continue } // Step 5.3: Generate the per-endpoint rules. // -A STACKUBE-PREROUTING -d 10.108.230.103 -m comment --comment "default/http: cluster IP" // -m tcp -p tcp --dport 80 -m statistic --mode random --probability 1.0 // -j DNAT --to-destination 192.168.1.7:80 n := len(p.endpointsMap[svcName]) for i, ep := range p.endpointsMap[svcName] { args := []string{ "-A", ChainSKPrerouting, "-m", "comment", "--comment", svcNameString, "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", p.getServiceIP(svcInfo)), "--dport", strconv.Itoa(svcInfo.port), } if i < (n - 1) { // Each rule is a probabilistic match. args = append(args, "-m", "statistic", "--mode", "random", "--probability", probability(n-i)) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", "DNAT", "--to-destination", ep.endpoint) writeLine(iptablesData, args...) } } writeLine(iptablesData, []string{"COMMIT"}...) // Step 6: execute iptables-restore. err = p.iptables.restoreAll(iptablesData.Bytes()) if err != nil { glog.Errorf("Failed to execute iptables-restore: %v", err) continue } } } func (p *Proxier) getServiceIP(serviceInfo *serviceInfo) string { if serviceInfo.name == "kube-dns" { return p.clusterDNS } return serviceInfo.clusterIP.String() } func getClusterDNS(client *kubernetes.Clientset) (string, error) { dnssvc, err := client.CoreV1().Services(metav1.NamespaceSystem).Get("kube-dns", metav1.GetOptions{}) if err == nil && len(dnssvc.Spec.ClusterIP) > 0 { return dnssvc.Spec.ClusterIP, nil } if apierrors.IsNotFound(err) { // get from default namespace. k8ssvc, err := client.CoreV1().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}) if err != nil { return "", fmt.Errorf("couldn't fetch information about the kubernetes service: %v", err) } // Build an IP by taking the kubernetes service's clusterIP and appending a "0" and checking that it's valid dnsIP := net.ParseIP(fmt.Sprintf("%s0", k8ssvc.Spec.ClusterIP)) if dnsIP == nil { return "", fmt.Errorf("could not parse dns ip %q: %v", dnsIP, err) } return dnsIP.String(), nil } return "", err }