/* Copyright (c) 2017 OpenStack Foundation. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package proxy import ( "bytes" "fmt" "strconv" "strings" "sync" "sync/atomic" "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" informersV1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/util/async" "git.openstack.org/openstack/stackube/pkg/openstack" "git.openstack.org/openstack/stackube/pkg/util" "github.com/golang/glog" ) const ( defaultResyncPeriod = 15 * time.Minute minSyncPeriod = 5 * time.Second syncPeriod = 30 * time.Second burstSyncs = 2 ) type Proxier struct { kubeClientset *kubernetes.Clientset osClient *openstack.Client factory informers.SharedInformerFactory namespaceInformer informersV1.NamespaceInformer serviceInformer informersV1.ServiceInformer endpointInformer informersV1.EndpointsInformer // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since iptables was synced. For a single object, // changes are accumulated, i.e. previous is state from before all of them, // current is state after applying all of those. endpointsChanges endpointsChangeMap serviceChanges serviceChangeMap namespaceChanges namespaceChangeMap mu sync.Mutex // protects the following fields initialized int32 // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating iptables // with some partial data after kube-proxy restart. endpointsSynced bool servicesSynced bool namespaceSynced bool serviceMap proxyServiceMap endpointsMap proxyEndpointsMap namespaceMap map[string]*namespaceInfo // service grouping by namespace. serviceNSMap map[string]proxyServiceMap // governs calls to syncProxyRules syncRunner *async.BoundedFrequencyRunner } // NewProxier creates a new Proxier. func NewProxier(kubeConfig, openstackConfig string) (*Proxier, error) { // Create OpenStack client from config file. osClient, err := openstack.NewClient(openstackConfig, kubeConfig) if err != nil { return nil, fmt.Errorf("could't initialize openstack client: %v", err) } // Create kubernetes client config. Use kubeconfig if given, otherwise assume in-cluster. config, err := util.NewClusterConfig(kubeConfig) if err != nil { return nil, fmt.Errorf("failed to build kubeconfig: %v", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("failed to build clientset: %v", err) } factory := informers.NewSharedInformerFactory(clientset, defaultResyncPeriod) proxier := &Proxier{ kubeClientset: clientset, osClient: osClient, factory: factory, endpointsChanges: newEndpointsChangeMap(""), serviceChanges: newServiceChangeMap(), namespaceChanges: newNamespaceChangeMap(), serviceMap: make(proxyServiceMap), endpointsMap: make(proxyEndpointsMap), namespaceMap: make(map[string]*namespaceInfo), serviceNSMap: make(map[string]proxyServiceMap), } proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) return proxier, nil } func (proxier *Proxier) setInitialized(value bool) { var initialized int32 if value { initialized = 1 } atomic.StoreInt32(&proxier.initialized, initialized) } func (proxier *Proxier) isInitialized() bool { return atomic.LoadInt32(&proxier.initialized) > 0 } func (p *Proxier) onServiceAdded(obj interface{}) { service, ok := obj.(*v1.Service) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if p.serviceChanges.update(&namespacedName, nil, service) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onServiceUpdated(old, new interface{}) { oldService, ok := old.(*v1.Service) if !ok { glog.Errorf("Unexpected object type: %v", old) return } service, ok := new.(*v1.Service) if !ok { glog.Errorf("Unexpected object type: %v", new) return } namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if p.serviceChanges.update(&namespacedName, oldService, service) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onServiceDeleted(obj interface{}) { service, ok := obj.(*v1.Service) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } if service, ok = tombstone.Obj.(*v1.Service); !ok { glog.Errorf("Unexpected object type: %v", obj) return } } namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if p.serviceChanges.update(&namespacedName, service, nil) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) getRouterForNamespace(namespace string) (string, error) { // Only support one network and network's name is same with namespace. // TODO: make it general after multi-network is supported. networkName := util.BuildNetworkName(namespace, namespace) network, err := p.osClient.GetNetwork(networkName) if err != nil { glog.Errorf("Get network by name %q failed: %v", networkName, err) return "", err } ports, err := p.osClient.ListPorts(network.Uid, "network:router_interface") if err != nil { glog.Errorf("Get port list for network %q failed: %v", networkName, err) return "", err } if len(ports) == 0 { glog.Errorf("Get zero router interface for network %q", networkName) return "", fmt.Errorf("no router interface found") } return ports[0].DeviceID, nil } func (p *Proxier) onEndpointsAdded(obj interface{}) { endpoints, ok := obj.(*v1.Endpoints) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if p.endpointsChanges.update(&namespacedName, nil, endpoints) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onEndpointUpdated(old, new interface{}) { oldEndpoints, ok := old.(*v1.Endpoints) if !ok { glog.Errorf("Unexpected object type: %v", old) return } endpoints, ok := new.(*v1.Endpoints) if !ok { glog.Errorf("Unexpected object type: %v", new) return } namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if p.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onEndpointDeleted(obj interface{}) { endpoints, ok := obj.(*v1.Endpoints) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } if endpoints, ok = tombstone.Obj.(*v1.Endpoints); !ok { glog.Errorf("Unexpected object type: %v", obj) return } } namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} if p.endpointsChanges.update(&namespacedName, endpoints, nil) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onNamespaceAdded(obj interface{}) { namespace, ok := obj.(*v1.Namespace) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } if p.namespaceChanges.update(namespace.Name, nil, namespace) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onNamespaceUpdated(old, new interface{}) { oldNamespace, ok := old.(*v1.Namespace) if !ok { glog.Errorf("Unexpected object type: %v", old) return } namespace, ok := new.(*v1.Namespace) if !ok { glog.Errorf("Unexpected object type: %v", new) return } if p.namespaceChanges.update(oldNamespace.Name, oldNamespace, namespace) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) onNamespaceDeleted(obj interface{}) { namespace, ok := obj.(*v1.Namespace) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.Errorf("Unexpected object type: %v", obj) return } if namespace, ok = tombstone.Obj.(*v1.Namespace); !ok { glog.Errorf("Unexpected object type: %v", obj) return } } if p.namespaceChanges.update(namespace.Name, namespace, nil) && p.isInitialized() { p.syncRunner.Run() } } func (p *Proxier) RegisterInformers() { p.namespaceInformer = p.factory.Core().V1().Namespaces() p.namespaceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: p.onNamespaceAdded, UpdateFunc: p.onNamespaceUpdated, DeleteFunc: p.onNamespaceDeleted, }, time.Minute) p.serviceInformer = p.factory.Core().V1().Services() p.serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: p.onServiceAdded, UpdateFunc: p.onServiceUpdated, DeleteFunc: p.onServiceDeleted, }, time.Minute) p.endpointInformer = p.factory.Core().V1().Endpoints() p.endpointInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: p.onEndpointsAdded, UpdateFunc: p.onEndpointUpdated, DeleteFunc: p.onEndpointDeleted, }, time.Minute) } func (p *Proxier) StartNamespaceInformer(stopCh <-chan struct{}) error { if !cache.WaitForCacheSync(stopCh, p.namespaceInformer.Informer().HasSynced) { return fmt.Errorf("failed to cache namespaces") } glog.Infof("Namespace informer cached.") // Update sync status. p.mu.Lock() p.namespaceSynced = true p.setInitialized(p.servicesSynced && p.endpointsSynced && p.namespaceSynced) p.mu.Unlock() // Sync unconditionally - this is called once per lifetime. p.syncProxyRules() return nil } func (p *Proxier) StartServiceInformer(stopCh <-chan struct{}) error { if !cache.WaitForCacheSync(stopCh, p.serviceInformer.Informer().HasSynced) { return fmt.Errorf("failed to cache services") } glog.Infof("Services informer cached.") // Update sync status. p.mu.Lock() p.servicesSynced = true p.setInitialized(p.servicesSynced && p.endpointsSynced && p.namespaceSynced) p.mu.Unlock() // Sync unconditionally - this is called once per lifetime. p.syncProxyRules() return nil } func (p *Proxier) StartEndpointInformer(stopCh <-chan struct{}) error { if !cache.WaitForCacheSync(stopCh, p.endpointInformer.Informer().HasSynced) { return fmt.Errorf("failed to cache endpoints") } glog.Infof("Endpoints informer cached.") p.mu.Lock() p.endpointsSynced = true p.setInitialized(p.servicesSynced && p.endpointsSynced && p.namespaceSynced) p.mu.Unlock() // Sync unconditionally - this is called once per lifetime. p.syncProxyRules() return nil } func (p *Proxier) StartInformerFactory(stopCh <-chan struct{}) error { p.factory.Start(stopCh) return nil } func (p *Proxier) SyncLoop() error { p.syncRunner.Loop(wait.NeverStop) return nil } func (p *Proxier) updateCaches() { // Update serviceMap. func() { p.serviceChanges.lock.Lock() defer p.serviceChanges.lock.Unlock() for _, change := range p.serviceChanges.items { existingPorts := p.serviceMap.merge(change.current) p.serviceMap.unmerge(change.previous, existingPorts) } p.serviceChanges.items = make(map[types.NamespacedName]*serviceChange) }() // Update services grouping by namespace. func() { for svc := range p.serviceMap { info := p.serviceMap[svc] if v, ok := p.serviceNSMap[svc.Namespace]; ok { v[svc] = info } else { p.serviceNSMap[svc.Namespace] = proxyServiceMap{svc: info} } } }() // Update endpointsMap. func() { p.endpointsChanges.lock.Lock() defer p.endpointsChanges.lock.Unlock() for _, change := range p.endpointsChanges.items { p.endpointsMap.unmerge(change.previous) p.endpointsMap.merge(change.current) } p.endpointsChanges.items = make(map[types.NamespacedName]*endpointsChange) }() // Update namespaceMap and get router for namespaces. func() { p.namespaceChanges.lock.Lock() defer p.namespaceChanges.lock.Unlock() for n, change := range p.namespaceChanges.items { if change.current == nil { delete(p.namespaceMap, n) } else { if _, ok := p.namespaceMap[n]; !ok { p.namespaceMap[n] = change.current } // get router for the namespace if p.namespaceMap[n].router == "" { router, err := p.getRouterForNamespace(n) if err != nil { glog.Warningf("Get router for namespace %q failed: %v. This may be caused by network not ready yet.", n, err) continue } p.namespaceMap[n].router = router } } } p.namespaceChanges.items = make(map[string]*namespaceChange) }() } func (p *Proxier) syncProxyRules() { p.mu.Lock() defer p.mu.Unlock() // don't sync rules until we've received services and endpoints if !p.servicesSynced || !p.endpointsSynced || !p.namespaceSynced { glog.V(2).Info("Not syncing iptables until services, endpoints and namespaces have been received from master") return } // update local caches. p.updateCaches() glog.V(3).Infof("Syncing iptables rules") // iptablesData contains the iptables rules for netns. iptablesData := bytes.NewBuffer(nil) // Sync iptables rules for services. for namespace := range p.serviceNSMap { iptablesData.Reset() // Step 1: get namespace info. nsInfo, ok := p.namespaceMap[namespace] if !ok { glog.Errorf("Namespace %q doesn't exist in caches", namespace) continue } glog.V(3).Infof("Syncing iptables for namespace %q: %v", namespace, nsInfo) // Step 2: try to get router again since router may be created late after namespaces. if nsInfo.router == "" { router, err := p.getRouterForNamespace(namespace) if err != nil { glog.Warningf("Get router for namespace %q failed: %v. This may be caused by network not ready yet.", namespace, err) continue } nsInfo.router = router } // Step 3: compose iptables chain. netns := getRouterNetns(nsInfo.router) if !netnsExist(netns) { glog.V(3).Infof("Netns %q doesn't exist, omit the services in namespace %q", netns, namespace) continue } ipt := NewIptables(netns) // ensure chain STACKUBE-PREROUTING created. err := ipt.ensureChain() if err != nil { glog.Errorf("EnsureChain %q in netns %q failed: %v", ChainSKPrerouting, netns, err) continue } // link STACKUBE-PREROUTING chain. err = ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{ "-m", "comment", "--comment", "stackube service portals", "-j", ChainSKPrerouting, }) if err != nil { glog.Errorf("Link chain %q in netns %q failed: %v", ChainSKPrerouting, netns, err) continue } // Step 4: flush chain STACKUBE-PREROUTING. writeLine(iptablesData, []string{"*nat"}...) writeLine(iptablesData, []string{":" + ChainSKPrerouting, "-", "[0:0]"}...) writeLine(iptablesData, []string{opFlushChain, ChainSKPrerouting}...) writeLine(iptablesData, []string{"COMMIT"}...) // Step 5: compose rules for each services. glog.V(5).Infof("Syncing iptables for services %v", p.serviceNSMap[namespace]) writeLine(iptablesData, []string{"*nat"}...) for svcName, svcInfo := range p.serviceNSMap[namespace] { protocol := strings.ToLower(string(svcInfo.protocol)) svcNameString := svcInfo.serviceNameString // Step 5.1: check service type. // Only ClusterIP service is supported. NodePort service is not supported since networks are L2 isolated. // LoadBalancer service is handled in service controller. if svcInfo.serviceType != v1.ServiceTypeClusterIP { glog.V(3).Infof("Only ClusterIP service is supported, omitting service %q", svcName.NamespacedName) continue } // Step 5.2: check endpoints. // If the service has no endpoints then do nothing. if len(p.endpointsMap[svcName]) == 0 { glog.V(3).Infof("No endpoints found for service %q", svcName.NamespacedName) continue } // Step 5.3: Generate the per-endpoint rules. // -A STACKUBE-PREROUTING -d 10.108.230.103 -m comment --comment "default/http: cluster IP" // -m tcp -p tcp --dport 80 -m statistic --mode random --probability 1.0 // -j DNAT --to-destination 192.168.1.7:80 n := len(p.endpointsMap[svcName]) for i, ep := range p.endpointsMap[svcName] { args := []string{ "-A", ChainSKPrerouting, "-m", "comment", "--comment", svcNameString, "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "--dport", strconv.Itoa(svcInfo.port), } if i < (n - 1) { // Each rule is a probabilistic match. args = append(args, "-m", "statistic", "--mode", "random", "--probability", probability(n-i)) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", "DNAT", "--to-destination", ep.endpoint) writeLine(iptablesData, args...) } } writeLine(iptablesData, []string{"COMMIT"}...) // Step 6: execute iptables-restore. err = ipt.restoreAll(iptablesData.Bytes()) if err != nil { glog.Errorf("Failed to execute iptables-restore: %v", err) continue } } }