KubeAPIServer 的存储接口实现
共 15655字,需浏览 32分钟
·
2023-06-21 23:21
上回讲到 KubeAPIServer 在核心路由注册过程中,主要分成两步,第一步创建 RESTStorage 将后端存储与资源进行绑定,第二步才进行路由注册。
路由注册已经讲了,回过头来看看 RESTStorage 的创建:
// pkg/controlplane/instance.go
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter) error {
// 主要是为 LegacyAPI 中各个资源创建 RESTStorage
// 并将各种资源和对应的后端存储(etcd)的操作绑定
// 传递 generic.RESTOptionsGetter 类型参数
// generic.RESTOptionsGetter 是存储接口的实现(先有个印象,稍后再看)
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter)
if err != nil {
return fmt.Errorf("error building core storage: %v", err)
}
// ...
// 路由注册
return nil
}
跳到 NewLegacyRESTStorage
方法,先为每个资源的 RESTStorage 初始化:
// pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
// ...
restStorage := LegacyRESTStorage{}
// 每个资源的初始化都传递了 generic.RESTOptionsGetter 类型参数,即存储接口的实现
// PodTemplate 资源的 RESTStorage 初始化
podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
// Event 资源的 RESTStorage 初始化
eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
// LimitRange 资源的 RESTStorage 初始化
limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
// ResourceQuota 资源的 RESTStorage 初始化
resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter)
// Secret 资源的 RESTStorage 初始化
secretStorage, err := secretstore.NewREST(restOptionsGetter)
// PersistentVolume 资源的 RESTStorage 初始化
persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)
// PersistentVolumeClaim 资源的 RESTStorage 初始化
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)
// ConfigMap 资源的 RESTStorage 初始化
configMapStorage, err := configmapstore.NewREST(restOptionsGetter)
// Namespace 资源的 RESTStorage 初始化
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage, err := namespacestore.NewREST(restOptionsGetter)
// Endpoints 资源的 RESTStorage 初始化
endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
// Node 资源的 RESTStorage 初始化
nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
// Pod 资源的 RESTStorage 初始化
podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
c.ProxyTransport,
podDisruptionClient,
)
// ServiceAccount 资源的 RESTStorage 初始化
var serviceAccountStorage *serviceaccountstore.REST
if c.ServiceAccountIssuer != nil {
serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, c.ServiceAccountIssuer, c.APIAudiences, c.ServiceAccountMaxExpiration, podStorage.Pod.Store, secretStorage.Store, c.ExtendExpiration)
} else {
serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, nil, nil, 0, nil, nil, false)
}
// ......
}
再将资源和对应的 RESTStorage 进行绑定:
// pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
// ......
// 利用 map 来保存资源的 http path 和对应的 RESTStorage
storage := map[string]rest.Storage{}
if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.Pod
// 对于 Pod 资源,有很多细分的 path 及其 RESTStorage
storage[resource+"/attach"] = podStorage.Attach
storage[resource+"/status"] = podStorage.Status
storage[resource+"/log"] = podStorage.Log
storage[resource+"/exec"] = podStorage.Exec
storage[resource+"/portforward"] = podStorage.PortForward
storage[resource+"/proxy"] = podStorage.Proxy
storage[resource+"/binding"] = podStorage.Binding
if podStorage.Eviction != nil {
storage[resource+"/eviction"] = podStorage.Eviction
}
storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
}
if resource := "bindings"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.LegacyBinding
}
if resource := "podtemplates"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podTemplateStorage
}
if resource := "replicationcontrollers"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = controllerStorage.Controller
storage[resource+"/status"] = controllerStorage.Status
if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
storage[resource+"/scale"] = controllerStorage.Scale
}
}
if resource := "services"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = serviceRESTStorage
storage[resource+"/proxy"] = serviceRESTProxy
storage[resource+"/status"] = serviceStatusStorage
}
if resource := "endpoints"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = endpointsStorage
}
if resource := "nodes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = nodeStorage.Node
storage[resource+"/proxy"] = nodeStorage.Proxy
storage[resource+"/status"] = nodeStorage.Status
}
if resource := "events"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = eventStorage
}
if resource := "limitranges"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = limitRangeStorage
}
if resource := "resourcequotas"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = resourceQuotaStorage
storage[resource+"/status"] = resourceQuotaStatusStorage
}
if resource := "namespaces"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = namespaceStorage
storage[resource+"/status"] = namespaceStatusStorage
storage[resource+"/finalize"] = namespaceFinalizeStorage
}
if resource := "secrets"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = secretStorage
}
if resource := "serviceaccounts"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = serviceAccountStorage
if serviceAccountStorage.Token != nil {
storage[resource+"/token"] = serviceAccountStorage.Token
}
}
if resource := "persistentvolumes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = persistentVolumeStorage
storage[resource+"/status"] = persistentVolumeStatusStorage
}
if resource := "persistentvolumeclaims"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = persistentVolumeClaimStorage
storage[resource+"/status"] = persistentVolumeClaimStatusStorage
}
if resource := "configmaps"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = configMapStorage
}
if resource := "componentstatuses"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate)
}
if len(storage) > 0 {
apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
}
return restStorage, apiGroupInfo, nil
}
存储了所有资源路径和对应 RESTStorage 的 map 结构最后会保存在 apiGroupInfo 中并返回给后续的路由注册使用(见上回)。这样后续的 handler 就可以通过对应的 RESTStorage 来操作 etcd 存取数据,同时这种做法还可以划分每个资源对 Storage 操作的权限,保障数据的安全性。
继续以 Pod 资源的 RESTStorage 初始化为例,来到 podstore.NewStorage
方法:
// pkg/registry/core/pod/storage/storage.go
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
// 创建 Store 实例
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
PredicateFunc: registrypod.MatchPod,
DefaultQualifiedResource: api.Resource("pods"),
SingularQualifiedResource: api.Resource("pod"),
CreateStrategy: registrypod.Strategy,
UpdateStrategy: registrypod.Strategy,
DeleteStrategy: registrypod.Strategy,
ResetFieldsStrategy: registrypod.Strategy,
ReturnDeletedObject: true,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
// Store 配置
options := &generic.StoreOptions{
// RESTOptions 就是传入的存储接口的实现
RESTOptions: optsGetter,
AttrFunc: registrypod.GetAttrs,
TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
Indexers: registrypod.Indexers(),
}
// 初始化 Store 实例,传递 options 参数
if err := store.CompleteWithOptions(options); err != nil {
return PodStorage{}, err
}
// 给 Pod 资源的路径绑定上 Store 实例,返回 PodStorage 对象
statusStore := *store
statusStore.UpdateStrategy = registrypod.StatusStrategy
statusStore.ResetFieldsStrategy = registrypod.StatusStrategy
ephemeralContainersStore := *store
ephemeralContainersStore.UpdateStrategy = registrypod.EphemeralContainersStrategy
bindingREST := &BindingREST{store: store}
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
LegacyBinding: &LegacyBindingREST{bindingREST},
Eviction: newEvictionStorage(&statusStore, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}, nil
}
跳到初始化 Store 实例的 store.CompleteWithOptions
方法:
// k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
// ...
// RESTOptions 是存储接口的实现
// 调用 RESTOptions 的 GetRESTOptions 方法获取对应的存储实现的选项 opts
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
if err != nil {
return err
}
// 将 opts 中的值赋给 e 即 Store 实例,完成对 Store 实例的初始化
if e.DeleteCollectionWorkers == 0 {
e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
}
e.EnableGarbageCollection = opts.EnableGarbageCollection
// ...
if e.Storage.Storage == nil {
e.Storage.Codec = opts.StorageConfig.Codec
var err error
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
options.Indexers,
)
if err != nil {
return err
}
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker)
previousDestroy := e.DestroyFunc
var once sync.Once
e.DestroyFunc = func() {
once.Do(func() {
stopFunc()
if previousDestroy != nil {
previousDestroy()
}
})
}
}
}
return nil
}
到这里,Pod 资源的 RESTStorage 初始化完成。
接下来就开始看看一直提到的存储接口的实现 generic.RESTOptionsGetter
,它是一个接口类型:
// k8s.io/apiserver/pkg/registry/generic/options.go
type RESTOptionsGetter interface {
GetRESTOptions(resource schema.GroupResource) (RESTOptions, error)
}
要找到 KubeAPIServer 对应的实现,我们要回到最开始为 KubeAPIServer 创建配置的地方,即第 2 回中创建服务调用链的地方:
// cmd/kube-apiserver/app/server.go
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
// 为 KubeAPIServer 创建配置
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
if err != nil {
return nil, err
}
// ...
return aggregatorServer, nil
}
// cmd/kube-apiserver/app/server.go
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
proxyTransport := CreateProxyTransport()
// 跳到 buildGenericConfig
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
// ...
return config, serviceResolver, pluginInitializers, nil
}
// cmd/kube-apiserver/app/server.go
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
pluginInitializers []admission.PluginInitializer,
admissionPostStartHook genericapiserver.PostStartHookFunc,
storageFactory *serverstorage.DefaultStorageFactory,
lastErr error,
) {
// ...
// RESTOptionsGetter 的初始化
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// ...
return
}
// k8s.io/apiserver/pkg/server/options/etcd.go
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
//...
// RESTOptionsGetter 的实现是 StorageFactoryRestOptionsFactory
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
总结下完整的调用链:
存储接口的实现:
CreateServerChain
创建服务调用链 → CreateKubeAPIServerConfig
创建 kubeAPIServerConfig
配置 → buildGenericConfig
生成 kubeAPIServerConfig
配置中的 genericConfig
配置 → ApplyWithStorageFactoryTo
初始化 genericConfig
配置的 RESTOptionsGetter
存储接口的使用:
CreateServerChain
创建服务调用链 → CreateKubeAPIServer
初始化服务传入 kubeAPIServerConfig
配置 → kubeAPIServerConfig.Complete().New(delegateAPIServer)
→ InstallLegacyAPI
核心路由注册传入 kubeAPIServerConfig.GenericConfig.RESTOptionsGetter
存储实现
回到刚才, 得知 RESTOptionsGetter
使用了 StorageFactoryRestOptionsFactory
来实现存储接口:
// k8s.io/apiserver/pkg/server/options/etcd.go
// RESTOptionsGetter 接口的一个实现
type StorageFactoryRestOptionsFactory struct {
Options EtcdOptions
StorageFactory serverstorage.StorageFactory
}
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
size, ok := sizes[resource]
if ok && size > 0 {
klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
}
if ok && size <= 0 {
klog.V(3).InfoS("Not using watch cache", "resource", resource)
// 不使用 watch cache 的 Storage 实现
ret.Decorator = generic.UndecoratedStorage
} else {
klog.V(3).InfoS("Using watch cache", "resource", resource)
// 使用 watch cache 的 Storage 实现
ret.Decorator = genericregistry.StorageWithCacher()
}
}
return ret, nil
}
不论是否使用 watch cache 的 Storage 实现,都会调用 generic.NewRawStorage
方法:
func UndecoratedStorage(
config *storagebackend.ConfigForResource,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config, newFunc)
}
func StorageWithCacher() generic.StorageDecorator {
return func(
storageConfig *storagebackend.ConfigForResource,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFuncs storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
s, d, err := generic.NewRawStorage(storageConfig, newFunc)
//...
return cacher, destroyFunc, nil
}
}
看到 NewRawStorage
方法:
// k8s.io/apiserver/pkg/registry/generic/storage_decorator.go
func NewRawStorage(config *storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config, newFunc)
}
// k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
// 只支持 etcd V3
return newETCD3Storage(c, newFunc)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
来到终点站 newETCD3Storage
方法:
// k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go
func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
// ...
// 创建 etcd V3 的 client
client, err := newETCD3Client(c.Transport)
if err != nil {
stopCompactor()
return nil, nil, err
}
// ...
return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
}
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
// ...
cfg := clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
DialOptions: dialOptions,
Endpoints: c.ServerList,
TLS: tlsConfig,
Logger: etcd3ClientLogger,
}
// 调用 etcd 官方库 go.etcd.io/etcd/client/v3
return clientv3.New(cfg)
}
至此,etcd 的初始化便完成。