Calico 网络策略深度解析

共 5752字,需浏览 12分钟

 ·

2022-03-16 06:51


本文主要探讨 Calico 项目如何实现 Kubernetes 的网络策略(Network Policy)。

网络策略是一种以应用为中心的结构,设置规则来指定 Pod 如何与各类网络“实体”通信。

NetworkPolicies are an application-centric construct which allow you to specify how a pod is allowed to communicate with various network “entities” (we use the word “entity” here to avoid overloading the more common terms such as “endpoints” and “services”, which have specific Kubernetes connotations) over the network.

$ kubectl create deployment --namespace=policy-demo nginx --image=nginx
deployment.apps/nginx created
$ kubectl create -f - <kind: NetworkPolicy
apiVersion: networking.k8s.io/v1
metadata:
  name: access-nginx
  namespace: policy-demo
spec:
  podSelector:
    matchLabels:
      app: nginx
  ingress:
    - from:
      - podSelector:
          matchLabels:
            run: access
EOF
$ kubectl get netpol -n policy-demo
NAME           POD-SELECTOR   AGE
access-nginx   app=nginx      12s

这条 NetworkPolicy 允许带上 run: access 标签的 Pod 访问带上 app: nginx 标签的 Pod。

Calico 部署完成后,Kubernetes 集群中每个节点都会运行一个名为 calico-node daemon 进程。

manifest:https://projectcalico.docs.tigera.io/manifests/calico.yaml

containers:
  # Runs calico-node container on each Kubernetes node. This
  # container programs network policy and routes on each
  # host.
  - name: calico-node
    image: docker.io/calico/node:v3.22.1
    envFrom:
    - configMapRef:
        # Allow KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT to be overridden for eBPF mode.
        name: kubernetes-services-endpoint
        optional: true
    env:
      # Use Kubernetes API as the backing datastore.
      - name: DATASTORE_TYPE
        value: "kubernetes"
      # Wait for the datastore.
      - name: WAIT_FOR_DATASTORE
        value: "true"
      # Set based on the k8s node name.
      - name: NODENAME
        valueFrom:
          fieldRef:
            fieldPath: spec.nodeName
      # Choose the backend to use.
      - name: CALICO_NETWORKING_BACKEND
        valueFrom:
          configMapKeyRef:
            name: calico-config
            key: calico_backend
      # Cluster type to identify the deployment type
      - name: CLUSTER_TYPE
        value: "k8s,bgp"
      # Auto-detect the BGP IP address.
      - name: IP
        value: "autodetect"
      # Enable IPIP
      - name: CALICO_IPV4POOL_IPIP
        value: "Always"
      # Enable or Disable VXLAN on the default IP pool.
      - name: CALICO_IPV4POOL_VXLAN
        value: "Never"
      # Set MTU for tunnel device used if ipip is enabled
      - name: FELIX_IPINIPMTU
        valueFrom:
          configMapKeyRef:
            name: calico-config
            key: veth_mtu
      # Set MTU for the VXLAN tunnel device.
      - name: FELIX_VXLANMTU
        valueFrom:
          configMapKeyRef:
            name: calico-config
            key: veth_mtu
      # Set MTU for the Wireguard tunnel device.
      - name: FELIX_WIREGUARDMTU
        valueFrom:
          configMapKeyRef:
            name: calico-config
            key: veth_mtu
      # The default IPv4 pool to create on startup if none exists. Pod IPs will be
      # chosen from this range. Changing this value after installation will have
      # no effect. This should fall within `--cluster-cidr`.
      # - name: CALICO_IPV4POOL_CIDR
      #   value: "192.168.0.0/16"
      # Disable file logging so `kubectl logs` works.
      - name: CALICO_DISABLE_FILE_LOGGING
        value: "true"
      # Set Felix endpoint to host default action to ACCEPT.
      - name: FELIX_DEFAULTENDPOINTTOHOSTACTION
        value: "ACCEPT"
      # Disable IPv6 on Kubernetes.
      - name: FELIX_IPV6SUPPORT
        value: "false"
      - name: FELIX_HEALTHENABLED
        value: "true"

无法从 DaemonSet 的定义得知 calico-node 启动了什么,应该是被封装在镜像中[1]了,所以我们在这里使用 ps 工具来查看 calico-node 相关进程的启动命令:

$ ps -ef | grep calico-node
root     10726 10718  2 06:44 ?        00:00:16 calico-node -felix
root     10727 10725  0 06:44 ?        00:00:00 calico-node -monitor-token
root     10728 10719  0 06:44 ?        00:00:00 calico-node -monitor-addresses
root     10729 10721  0 06:44 ?        00:00:00 calico-node -status-reporter
root     10731 10724  0 06:44 ?        00:00:00 calico-node -confd
root     10733 10720  0 06:44 ?        00:00:00 calico-node -allocate-tunnel-addrs
root     22100 10594  0 06:53 pts/0    00:00:00 grep --color=auto calico-node

提前告知一下,监控 Kubernetes 集群中 NetworkPolicy 对象的进程叫 felix:

calico-node 进程的命令源码文件为 https://github.com/projectcalico/calico/blob/v3.22.1/node/cmd/calico-node/main.go:

var runFelix = flagSet.Bool("felix"false"Run Felix")

func main() {
    // a lot of code here

    if *version {
        fmt.Println(startup.VERSION)
        os.Exit(0)
    } else if *runFelix {
        logrus.SetFormatter(&logutils.Formatter{Component: "felix"})
        felix.Run("/etc/calico/felix.cfg", buildinfo.GitVersion, buildinfo.BuildDate, buildinfo.GitRevision)
    }

    // a lot of code here
}

我们继续来看 calico-node felix 实例的启动函数 https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go#L88-L652:

func Run(configFile string, gitVersion string, buildDate string, gitRevision string) {
    // a lot of code here
    backendClient, err = backend.NewClient(datastoreConfig)
    if err != nil {
        log.WithError(err).Error("Failed to (re)connect to datastore")
        time.Sleep(1 * time.Second)
        continue configRetry
    }
    // a lot of code here
    var syncer Startable
    if typhaAddr != "" {
        // a lot of code here
    } else {
        // Use the syncer locally.
        syncer = felixsyncer.New(backendClient, datastoreConfig.Spec, syncerToValidator, configParams.IsLeader())
    }
}

因为部署 Calico 时选择使用 Kubernetes 存储数据,这里的 backendClient 也就是一个与 apiserver 通信的 kubeclient。继续往下看初始化 syncer 实例的 felixsyncer.New 方法 https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/syncersv1/felixsyncer/felixsyncerv1.go#L28-L111:

func New(client api.Client, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.SyncerCallbacks, isLeader bool) api.Syncer {
        if cfg.DatastoreType == apiconfig.Kubernetes {
            additionalTypes = append(additionalTypes, watchersyncer.ResourceType{
                ListInterface:   model.ResourceListOptions{Kind: model.KindKubernetesNetworkPolicy},
                UpdateProcessor: updateprocessors.NewNetworkPolicyUpdateProcessor(),
            })
            additionalTypes = append(additionalTypes, watchersyncer.ResourceType{
                ListInterface: model.ResourceListOptions{Kind: model.KindKubernetesEndpointSlice},
            })
        }

    return watchersyncer.New(
        client,
        resourceTypes,
        callbacks,
    )
}

返回一个 watcherSyncer[2] 结构实例,这个结构实现了 api.Syncer[3] 接口:

type watcherSyncer struct {
    status        api.SyncStatus
    watcherCaches []*watcherCache
    results       chan interface{}
    numSynced     int
    callbacks     api.SyncerCallbacks
    wgwc          *sync.WaitGroup
    wgws          *sync.WaitGroup
    cancel        context.CancelFunc
}

使用 Kubernetes 存储数据,除了 Calico 项目自定义的 CRD 资源(比如 GlobalNetworkPolicy)外,还会 watch Kubernetes 的 NetworkPolicy 资源。

再回到 daemon.go 文件[4],实例化好 syncer 后下一步就是要启动它了 https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go#L489-L493:

    if syncer != nil {
        log.Infof("Starting the datastore Syncer")
        syncer.Start()
    }

继续跳转到 https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/watchersyncer/watchersyncer.go:

func (ws *watcherSyncer) Start() {
    // a lot of code here
    go func() {
        ws.run(ctx)
        log.Debug("Watcher syncer run completed")
    }()
}

func (ws *watcherSyncer) run(ctx context.Context) {
    for _, wc := range ws.watcherCaches {
        ws.wgwc.Add(1)
        go func(wc *watcherCache) {
            wc.run(ctx)
            ws.wgwc.Done()
        }(wc)
    }
}

遍历 watcherCache 切片,在 goroutine 中运行它们的 run 方法 https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/watchersyncer/watchercache.go#L76-L142:

func (wc *watcherCache) run(ctx context.Context) {
    // a lot of code here
mainLoop:
    for {
        if wc.watch == nil {
            // The watcher will be nil if the context cancelled during a resync.
            wc.logger.Debug("Watch is nil. Returning")
            break mainLoop
        }
        select {
        case <-ctx.Done():
            wc.logger.Debug("Context is done. Returning")
            wc.cleanExistingWatcher()
            break mainLoop
        case event, ok := <-wc.watch.ResultChan():
            if !ok {
                // If the channel is closed then resync/recreate the watch.
                wc.logger.Info("Watch channel closed by remote - recreate watcher")
                wc.resyncAndCreateWatcher(ctx)
                continue
            }
            wc.logger.WithField("RC", wc.watch.ResultChan()).Debug("Reading event from results channel")

            // Handle the specific event type.
            switch event.Type {
            case api.WatchAdded, api.WatchModified:
                kvp := event.New
                wc.handleWatchListEvent(kvp)
            case api.WatchDeleted:
                // Nil out the value to indicate a delete.
                kvp := event.Old
                if kvp == nil {
                    // Bug, we're about to panic when we hit the nil pointer, log something useful.
                    wc.logger.WithField("watcher", wc).WithField("event", event).Panic("Deletion event without old value")
                }
                kvp.Value = nil
                wc.handleWatchListEvent(kvp)
            case api.WatchError:
                // Handle a WatchError. This error triggered from upstream, all type
                // of WatchError are treated equally,log the Error and trigger a full resync. We only log at info
                // because errors may occur due to compaction causing revisions to no longer be valid - in this case
                // we simply need to do a full resync.
                wc.logger.WithError(event.Error).Infof("Watch error received from Upstream")
                wc.currentWatchRevision = "0"
                wc.resyncAndCreateWatcher(ctx)
            default:
                // Unknown event type - not much we can do other than log.
                wc.logger.WithField("EventType", event.Type).Errorf("Unknown event type received from the datastore")
            }
        }
    }

   // a lot of code here
}

当监控到资源创建、修改、删除事件,并做出相应的处理。此处代码跳转较多,就不贴了。比如我们在 Kubernetes 集群中创建了一个 NetworkPolicy 对象,就会向 NetworkPolicy 对应的 watcherCacheresults channel 发送一条“更新”消息 https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/watchersyncer/watchercache.go#L333-L372:

    wc.results <- []api.Update{{
        UpdateType: api.UpdateTypeKVNew,
        KVPair:     *kvp,
    }}

watcherCache 引用了 watcherSyncerresults channel https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/watchersyncer/watchercache.go#L64-L74。

最终会触发 watcherSyncer 初始化时就设置好的回调 api.SyncerCallbacks https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go#L459:

syncerToValidator := calc.NewSyncerCallbacksDecoupler()

此处相关代码较多,就不详细说了。


在 felix 实例启动时,会去实例化并启动一个 AsyncCalcGraph 对象,用于动态计算 IP 组成员并“激活”策略 https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go#L531-L597:

    asyncCalcGraph := calc.NewAsyncCalcGraph(
        configParams.Copy(), // Copy to avoid concurrent access.
        calcGraphClientChannels,
        healthAggregator,
    )

    // a lot of code here
    asyncCalcGraph.Start()

asyncCalcGraph 启动一个循环监听更新事件 https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/async_calc_graph.go#L136-L194:

func (acg *AsyncCalcGraph) loop() {
    for {
        select {
        case update := <-acg.inputEvents:
            switch update := update.(type) {
            case []api.Update:
                // Update; send it to the dispatcher.
                log.Debug("Pulled []KVPair off channel")
                for i, upd := range update {
                    // Send the updates individually so that we can report live in between
                    // each update.  (The dispatcher sends individual updates anyway so this makes
                    // no difference.)
                    updStartTime := time.Now()
                    acg.AllUpdDispatcher.OnUpdates(update[i : i+1]) // here
                    summaryUpdateTime.Observe(time.Since(updStartTime).Seconds())
                    // Record stats for the number of messages processed.
                    typeName := reflect.TypeOf(upd.Key).Name()
                    count := countUpdatesProcessed.WithLabelValues(typeName)
                    count.Inc()
                    acg.reportHealth()
                }
                // a lot of code here
            }
        }
        acg.maybeFlush()
    }
}

我们接着来看 AllUpdDispatcher 成员,它在 `NewAsyncCalcGraph`[5] 函数中被初始化,跳转到 https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L114-L408:

  • allUpdDispatcher := dispatcher.NewDispatcher() 实例化一个 _allUpdDispatcher_ 对象[6]
  • 将各种资源相关的 Dispatcher 注册至 allUpdDispatcher
    • local endpoints[7]
    • service index[8]
    • ipset member[9]
    • endpoint policy resolver[10]

本文只讨论 NetworkPolicy 相关的部分,来到 https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/policy_resolver.go 文件:

func (pr *PolicyResolver) OnUpdate(update api.Update) (filterOut bool) {
    policiesDirty := false
    switch key := update.Key.(type) {
    case model.WorkloadEndpointKey, model.HostEndpointKey:
        if update.Value != nil {
            pr.endpoints[key] = update.Value
        } else {
            delete(pr.endpoints, key)
        }
        pr.dirtyEndpoints.Add(key)
        gaugeNumActiveEndpoints.Set(float64(len(pr.endpoints)))
    case model.PolicyKey:
        log.Debugf("Policy update: %v", key)
        policiesDirty = pr.policySorter.OnUpdate(update)
        if policiesDirty {
            pr.markEndpointsMatchingPolicyDirty(key)
        }
    }
    pr.sortRequired = pr.sortRequired || policiesDirty
    pr.maybeFlush()
    gaugeNumActivePolicies.Set(float64(pr.policyIDToEndpointIDs.Len()))
    return
}

func (pr *PolicyResolver) maybeFlush() {
    if !pr.InSync {
        log.Debugf("Not in sync, skipping flush")
        return
    }
    if pr.sortRequired {
        pr.refreshSortOrder()
    }
    pr.dirtyEndpoints.Iter(pr.sendEndpointUpdate)
    pr.dirtyEndpoints = set.New()
}

每当 NetworkPolicy 和 Endpoint 被添加/移除/修改,会通过 PolicyResolverCallbacks 回调触发对应事件,我们要找的,就是回调,NetworkPolicy 相关的回调声明了接口 https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L49-L106:

type PipelineCallbacks interface {
    ipSetUpdateCallbacks
    rulesUpdateCallbacks
    endpointCallbacks
    configCallbacks
    passthruCallbacks
    routeCallbacks
    vxlanCallbacks
}

type rulesUpdateCallbacks interface {
    OnPolicyActive(model.PolicyKey, *ParsedRules)
    OnPolicyInactive(model.PolicyKey)
    OnProfileActive(model.ProfileRulesKey, *ParsedRules)
    OnProfileInactive(model.ProfileRulesKey)
}

rulesUpdateCallbacks 接口的 OnPolicyActive 方法由 `RuleScanner`[11] 结构实现:

https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/rule_scanner.go#L165-L168

func (rs *RuleScanner) OnPolicyActive(key model.PolicyKey, policy *model.Policy) {
    parsedRules := rs.updateRules(key, policy.InboundRules, policy.OutboundRules, policy.DoNotTrack, policy.PreDNAT, policy.Namespace)
    rs.RulesUpdateCallbacks.OnPolicyActive(key, parsedRules)
}

有点越来越复杂了,不能再往下追了。


回到 felix 实例 Run 函数,实例化并启动 DataplaneDriver https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go#L398-L403:

    dpDriver, dpDriverCmd = dp.StartDataplaneDriver(
        configParams.Copy(), // Copy to avoid concurrent access.
        healthAggregator,
        configChangedRestartCallback,
        fatalErrorCallback,
        k8sClientSet)

跳转到 dp.StartDataplaneDriver[12] 函数:

func StartDataplaneDriver(configParams *config.Config,
    healthAggregator *health.HealthAggregator,
    configChangedRestartCallback func()
,

    fatalErrorCallback func(error),
    k8sClientSet *kubernetes.Clientset) (DataplaneDriver, *exec.Cmd) {
    // a lot of code here
        intDP := intdataplane.NewIntDataplaneDriver(dpConfig)
        intDP.Start()
    // a lot of code here
}

继续跳转到 intdataplane.NewIntDataplaneDriver[13] 函数:

func NewIntDataplaneDriver(config Config) *InternalDataplane {
    // a lot of code here
    if !config.BPFEnabled {
        dp.RegisterManager(newPolicyManager(rawTableV4, mangleTableV4, filterTableV4, ruleRenderer, 4))
    }
    // a lot of code here
}

如果未启用 EBPF,那就注册一个 PolicyManager[14],相关代码在 https://github.com/projectcalico/calico/blob/v3.22.1/felix/dataplane/linux/policy_mgr.go 文件中:

func newPolicyManager(rawTable, mangleTable, filterTable iptablesTable, ruleRenderer policyRenderer, ipVersion uint8) *policyManager {
    return &policyManager{
        rawTable:     rawTable,
        mangleTable:  mangleTable,
        filterTable:  filterTable,
        ruleRenderer: ruleRenderer,
        ipVersion:    ipVersion,
    }
}

隐隐感觉到可能和 iptables 有关系,再来看 PolicyManager`OnUpdate`[15] 方法:

func (m *policyManager) OnUpdate(msg interface{}) {
    switch msg := msg.(type) {
    case *proto.ActivePolicyUpdate:
        if m.rawEgressOnly && !msg.Policy.Untracked {
            log.WithField("id", msg.Id).Debug("Ignore non-untracked policy")
            return
        }
        log.WithField("id", msg.Id).Debug("Updating policy chains")
        chains := m.ruleRenderer.PolicyToIptablesChains(msg.Id, msg.Policy, m.ipVersion)
        if m.rawEgressOnly {
            neededIPSets := set.New()
            filteredChains := []*iptables.Chain(nil)
            for _, chain := range chains {
                if strings.Contains(chain.Name, string(rules.PolicyOutboundPfx)) {
                    filteredChains = append(filteredChains, chain)
                    neededIPSets.AddAll(chain.IPSetNames())
                }
            }
            chains = filteredChains
            m.mergeNeededIPSets(msg.Id, neededIPSets)
        }
        // We can't easily tell whether the policy is in use in a particular table, and, if the policy
        // type gets changed it may move between tables.  Hence, we put the policy into all tables.
        // The iptables layer will avoid programming it if it is not actually used.
        m.rawTable.UpdateChains(chains)
        m.mangleTable.UpdateChains(chains)
        m.filterTable.UpdateChains(chains)
    case *proto.ActivePolicyRemove:
        log.WithField("id", msg.Id).Debug("Removing policy chains")
        if m.rawEgressOnly {
            m.mergeNeededIPSets(msg.Id, nil)
        }
        inName := rules.PolicyChainName(rules.PolicyInboundPfx, msg.Id)
        outName := rules.PolicyChainName(rules.PolicyOutboundPfx, msg.Id)
        // As above, we need to clean up in all the tables.
        m.filterTable.RemoveChainByName(inName)
        m.filterTable.RemoveChainByName(outName)
        m.mangleTable.RemoveChainByName(inName)
        m.mangleTable.RemoveChainByName(outName)
        m.rawTable.RemoveChainByName(inName)
        m.rawTable.RemoveChainByName(outName)
    // a lot of code here
    }
}

PolicyManager 会将 NetworkPolicy 转换为 iptables 的链(Raw、Mangle、Filter),并更新宿主机上的 iptables 规则 https://github.com/projectcalico/calico/blob/v3.22.1/felix/iptables/table.go#L499-L528:

func (t *Table) UpdateChains(chains []*Chain) {
    for _, chain := range chains {
        t.UpdateChain(chain)
    }
}

func (t *Table) UpdateChain(chain *Chain) {
    t.logCxt.WithField("chainName", chain.Name).Info("Queueing update of chain.")
    oldNumRules := 0

    // Incref any newly-referenced chains, then decref the old ones.  By incrementing first we
    // avoid marking a still-referenced chain as dirty.
    t.increfReferredChains(chain.Rules)
    if oldChain := t.chainNameToChain[chain.Name]; oldChain != nil {
        oldNumRules = len(oldChain.Rules)
        t.decrefReferredChains(oldChain.Rules)
    }
    t.chainNameToChain[chain.Name] = chain
    numRulesDelta := len(chain.Rules) - oldNumRules
    t.gaugeNumRules.Add(float64(numRulesDelta))
    if t.chainRefCounts[chain.Name] > 0 {
        t.dirtyChains.Add(chain.Name)
    }

    // Defensive: make sure we re-read the dataplane state before we make updates.  While the
    // code was originally designed not to need this, we found that other users of
    // iptables-restore can still clobber our updates so it's safest to re-read the state before
    // each write.
    t.InvalidateDataplaneCache("chain update")
}

以上只是更新 Table 的缓存,即期望 Raw、Mangle、Filter 表达到的状态,当调用 Apply[16] 方法时真正地去同步宿主机上的 iptables 规则。

我们到 Kubernetes 集群中的主机上看一下 iptables 规则:

Chain cali-pi-_PfqSzIS1AirpjL0oXbg (1 references)
pkts bytes target prot opt in out source destination
0 0 MARK all -- any any anywhere anywhere /* cali:vPwStapFDttX0Qmr */ /* Policy policy-demo/knp.default.access-nginx ingress */ match-set cali40s:AMAB7BNa5u3MmIiwZrblkWt src MARK or 0x10000
0 0 RETURN all -- any any anywhere anywhere /* cali:JmKby7c_zvSDZn-y */ mark match 0x10000/0x10000

Chain cali-tw-calic6866a25fab (1 references)
pkts bytes target prot opt in out source destination
0 0 ACCEPT all -- any any anywhere anywhere /* cali:5QeAipZ91RWxAvAP */ ctstate RELATED,ESTABLISHED
0 0 DROP all -- any any anywhere anywhere /* cali:nUFU9ZCyUibQGJeM */ ctstate INVALID
0 0 MARK all -- any any anywhere anywhere /* cali:Fb8j6_8umAFH-caO */ MARK and 0xfffeffff
0 0 MARK all -- any any anywhere anywhere /* cali:5EvW0trrqBzvohhg */ /* Start of policies */ MARK and 0xfffdffff
0 0 cali-pi-_PfqSzIS1AirpjL0oXbg all -- any any anywhere anywhere /* cali:3WtlYwIIeClqiGDi */ mark match 0x0/0x20000
0 0 RETURN all -- any any anywhere anywhere /* cali:F9Es9Pigmj_w4QXL */ /* Return if policy accepted */ mark match 0x10000/0x10000
0 0 DROP all -- any any anywhere anywhere /* cali:IxzxRAfuJGtkJERO */ /* Drop if no policies passed packet */ mark match 0x0/0x20000
0 0 cali-pri-kns.policy-demo all -- any any anywhere anywhere /* cali:y1Vh7ciW6qiyN7yE */
0 0 RETURN all -- any any anywhere anywhere /* cali:ocU3P7TVKNDR3w0C */ /* Return if profile accepted */ mark match 0x10000/0x10000
0 0 cali-pri-_QdhVZ8TmSXm2EvYAKH all -- any any anywhere anywhere /* cali:KYnUV6tBjKhOVhvl */
0 0 RETURN all -- any any anywhere anywhere /* cali:-3c2S3tuRZHwglRj */ /* Return if profile accepted */ mark match 0x10000/0x10000
0 0 DROP all -- any any anywhere anywhere /* cali:1X5WKlXPf0ajKXrw */ /* Drop if no profiles matched */

可见 Calico 使用 iptables 的 mark 功能来标记/匹配网络数据包,实现 Kubernetes 集群内流量控制。

引用链接

[1]

封装在镜像中: https://github.com/projectcalico/calico/blob/v3.22.1/node/Dockerfile.amd64

[2]

watcherSyncer: https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/watchersyncer/watchersyncer.go#L76-L86

[3]

api.Syncer: https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/api/api.go#L127-L133

[4]

daemon.go 文件: https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go

[5]

NewAsyncCalcGraph: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/async_calc_graph.go#L96-L120

[6]

allUpdDispatcher := dispatcher.NewDispatcher() 实例化一个 allUpdDispatcher 对象: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L118-L131

[7]

local endpoints: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L133-L151

[8]

service index: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L203-L204

[9]

ipset member: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L225-L252

[10]

endpoint policy resolver: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L293-L319

[11]

RuleScanner: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/rule_scanner.go#L48-L84

[12]

dp.StartDataplaneDriver: https://github.com/projectcalico/calico/blob/v3.22.1/felix/dataplane/driver.go#L57-L390

[13]

intdataplane.NewIntDataplaneDriver: https://github.com/projectcalico/calico/blob/v3.22.1/felix/dataplane/linux/int_dataplane.go#L307-L866

[14]

PolicyManager: https://github.com/projectcalico/calico/blob/v3.22.1/felix/dataplane/linux/policy_mgr.go#L29-L40

[15]

OnUpdate: https://github.com/projectcalico/calico/blob/v3.22.1/felix/dataplane/linux/policy_mgr.go#L86-L144

[16]

Apply: https://github.com/projectcalico/calico/blob/v3.22.1/felix/iptables/table.go#L946-L1039

原文链接:https://blog.crazytaxii.com/posts/k8s_calico_network_policy/


你可能还喜欢

点击下方图片即可阅读

深入理解 netfilter 和 iptables!

云原生是一种信仰 🤘

关注公众号

后台回复◉k8s◉获取史上最方便快捷的 Kubernetes 高可用部署工具,只需一条命令,连 ssh 都不需要!



点击 "阅读原文" 获取更好的阅读体验!


发现朋友圈变“安静”了吗?

浏览 84
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报