Skip to content

Commit

Permalink
global sort: add boundaries to split keys when generating plan (#58323)…
Browse files Browse the repository at this point in the history
… (#58356)

close #58267
  • Loading branch information
lance6716 authored Dec 18, 2024
1 parent 678b713 commit d13e52e
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 69 deletions.
6 changes: 5 additions & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func splitSubtaskMetaForOneKVMetaGroup(
startKey := kvMeta.StartKey
var endKey kv.Key
for {
endKeyOfGroup, dataFiles, statFiles, interiorRangeJobKeys, regionSplitKeys, err := splitter.SplitOneRangesGroup()
endKeyOfGroup, dataFiles, statFiles, interiorRangeJobKeys, interiorRegionSplitKeys, err := splitter.SplitOneRangesGroup()
if err != nil {
return nil, err
}
Expand All @@ -468,6 +468,10 @@ func splitSubtaskMetaForOneKVMetaGroup(
rangeJobKeys = append(rangeJobKeys, startKey)
rangeJobKeys = append(rangeJobKeys, interiorRangeJobKeys...)
rangeJobKeys = append(rangeJobKeys, endKey)
regionSplitKeys := make([][]byte, 0, len(interiorRegionSplitKeys)+2)
regionSplitKeys = append(regionSplitKeys, startKey)
regionSplitKeys = append(regionSplitKeys, interiorRegionSplitKeys...)
regionSplitKeys = append(regionSplitKeys, endKey)
m := &BackfillSubTaskMeta{
MetaGroups: []*external.SortedKVMeta{{
StartKey: startKey,
Expand Down
4 changes: 3 additions & 1 deletion pkg/disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ go_test(
embed = [":importinto"],
flaky = True,
race = "on",
shard_count = 16,
shard_count = 17,
deps = [
"//br/pkg/storage",
"//pkg/ddl",
"//pkg/disttask/framework/planner",
"//pkg/disttask/framework/proto",
Expand Down Expand Up @@ -124,6 +125,7 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//:zap",
],
Expand Down
146 changes: 83 additions & 63 deletions pkg/disttask/importinto/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,71 +372,88 @@ func generateWriteIngestSpecs(planCtx planner.PlanCtx, p *LogicalPlan) ([]planne

specs := make([]planner.PipelineSpec, 0, 16)
for kvGroup, kvMeta := range kvMetas {
splitter, err1 := getRangeSplitter(ctx, controller.GlobalSortStore, kvMeta)
if err1 != nil {
return nil, err1
specsForOneSubtask, err3 := splitForOneSubtask(ctx, controller.GlobalSortStore, kvGroup, kvMeta, ts)
if err3 != nil {
return nil, err3
}
specs = append(specs, specsForOneSubtask...)
}
return specs, nil
}

err1 = func() error {
defer func() {
err2 := splitter.Close()
if err2 != nil {
logutil.Logger(ctx).Warn("close range splitter failed", zap.Error(err2))
}
}()
startKey := tidbkv.Key(kvMeta.StartKey)
var endKey tidbkv.Key
for {
endKeyOfGroup, dataFiles, statFiles, interiorRangeJobKeys, regionSplitKeys, err2 := splitter.SplitOneRangesGroup()
if err2 != nil {
return err2
}
if len(endKeyOfGroup) == 0 {
endKey = kvMeta.EndKey
} else {
endKey = tidbkv.Key(endKeyOfGroup).Clone()
}
logutil.Logger(ctx).Info("kv range as subtask",
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)),
zap.Int("dataFiles", len(dataFiles)))
if startKey.Cmp(endKey) >= 0 {
return errors.Errorf("invalid kv range, startKey: %s, endKey: %s",
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}
rangeJobKeys := make([][]byte, 0, len(interiorRangeJobKeys)+2)
rangeJobKeys = append(rangeJobKeys, startKey)
rangeJobKeys = append(rangeJobKeys, interiorRangeJobKeys...)
rangeJobKeys = append(rangeJobKeys, endKey)
// each subtask will write and ingest one range group
m := &WriteIngestStepMeta{
KVGroup: kvGroup,
SortedKVMeta: external.SortedKVMeta{
StartKey: startKey,
EndKey: endKey,
// this is actually an estimate, we don't know the exact size of the data
TotalKVSize: uint64(config.DefaultBatchSize),
},
DataFiles: dataFiles,
StatFiles: statFiles,
RangeJobKeys: rangeJobKeys,
RangeSplitKeys: regionSplitKeys,
TS: ts,
}
specs = append(specs, &WriteIngestSpec{m})

startKey = endKey
if len(endKeyOfGroup) == 0 {
break
}
}
return nil
}()
if err1 != nil {
return nil, err1
func splitForOneSubtask(
ctx context.Context,
extStorage storage.ExternalStorage,
kvGroup string,
kvMeta *external.SortedKVMeta,
ts uint64,
) ([]planner.PipelineSpec, error) {
splitter, err := getRangeSplitter(ctx, extStorage, kvMeta)
if err != nil {
return nil, err
}
defer func() {
err3 := splitter.Close()
if err3 != nil {
logutil.Logger(ctx).Warn("close range splitter failed", zap.Error(err3))
}
}()

ret := make([]planner.PipelineSpec, 0, 16)

startKey := tidbkv.Key(kvMeta.StartKey)
var endKey tidbkv.Key
for {
endKeyOfGroup, dataFiles, statFiles, interiorRangeJobKeys, interiorRegionSplitKeys, err2 := splitter.SplitOneRangesGroup()
if err2 != nil {
return nil, err2
}
if len(endKeyOfGroup) == 0 {
endKey = kvMeta.EndKey
} else {
endKey = tidbkv.Key(endKeyOfGroup).Clone()
}
logutil.Logger(ctx).Info("kv range as subtask",
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)),
zap.Int("dataFiles", len(dataFiles)))
if startKey.Cmp(endKey) >= 0 {
return nil, errors.Errorf("invalid kv range, startKey: %s, endKey: %s",
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}
rangeJobKeys := make([][]byte, 0, len(interiorRangeJobKeys)+2)
rangeJobKeys = append(rangeJobKeys, startKey)
rangeJobKeys = append(rangeJobKeys, interiorRangeJobKeys...)
rangeJobKeys = append(rangeJobKeys, endKey)

regionSplitKeys := make([][]byte, 0, len(interiorRegionSplitKeys)+2)
regionSplitKeys = append(regionSplitKeys, startKey)
regionSplitKeys = append(regionSplitKeys, interiorRegionSplitKeys...)
regionSplitKeys = append(regionSplitKeys, endKey)
// each subtask will write and ingest one range group
m := &WriteIngestStepMeta{
KVGroup: kvGroup,
SortedKVMeta: external.SortedKVMeta{
StartKey: startKey,
EndKey: endKey,
// this is actually an estimate, we don't know the exact size of the data
TotalKVSize: uint64(config.DefaultBatchSize),
},
DataFiles: dataFiles,
StatFiles: statFiles,
RangeJobKeys: rangeJobKeys,
RangeSplitKeys: regionSplitKeys,
TS: ts,
}
ret = append(ret, &WriteIngestSpec{m})

startKey = endKey
if len(endKeyOfGroup) == 0 {
break
}
}
return specs, nil

return ret, nil
}

func getSortedKVMetasOfEncodeStep(subTaskMetas [][]byte) (map[string]*external.SortedKVMeta, error) {
Expand Down Expand Up @@ -508,8 +525,11 @@ func getSortedKVMetasForIngest(planCtx planner.PlanCtx, p *LogicalPlan) (map[str
return kvMetasOfMergeSort, nil
}

func getRangeSplitter(ctx context.Context, store storage.ExternalStorage, kvMeta *external.SortedKVMeta) (
*external.RangeSplitter, error) {
func getRangeSplitter(
ctx context.Context,
store storage.ExternalStorage,
kvMeta *external.SortedKVMeta,
) (*external.RangeSplitter, error) {
regionSplitSize, regionSplitKeys, err := importer.GetRegionSplitSizeKeys(ctx)
if err != nil {
logutil.Logger(ctx).Warn("fail to get region split size and keys", zap.Error(err))
Expand Down
58 changes: 58 additions & 0 deletions pkg/disttask/importinto/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ import (
"fmt"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/planner"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
)

func TestLogicalPlan(t *testing.T) {
Expand Down Expand Up @@ -283,3 +287,57 @@ func TestGetSortedKVMetas(t *testing.T) {
require.Equal(t, []byte("i1_0_a"), allKVMetas["1"].StartKey)
require.Equal(t, []byte("i1_2_c"), allKVMetas["1"].EndKey)
}

func TestSplitForOneSubtask(t *testing.T) {
ctx := context.Background()
workDir := t.TempDir()
store, err := storage.NewLocalStorage(workDir)
require.NoError(t, err)

// about 140MB data
largeValue := make([]byte, 1024*1024)
keys := make([][]byte, 140)
values := make([][]byte, 140)
for i := 0; i < 140; i++ {
keys[i] = []byte(fmt.Sprintf("%05d", i))
values[i] = largeValue
}

var multiFileStat []external.MultipleFilesStat
writer := external.NewWriterBuilder().
SetMemorySizeLimit(40*1024*1024).
SetBlockSize(20*1024*1024).
SetPropSizeDistance(5*1024*1024).
SetPropKeysDistance(5).
SetOnCloseFunc(func(s *external.WriterSummary) {
multiFileStat = s.MultipleFilesStats
}).
Build(store, "/mock-test", "0")
_, _, err = external.MockExternalEngineWithWriter(
store, writer, "/mock-test", keys, values,
)
require.NoError(t, err)
kvMeta := &external.SortedKVMeta{
StartKey: keys[0],
EndKey: kv.Key(keys[len(keys)-1]).Next(),
MultipleFilesStats: multiFileStat,
}

bak := importer.NewClientWithContext
t.Cleanup(func() {
importer.NewClientWithContext = bak
})
importer.NewClientWithContext = func(_ context.Context, _ []string, _ pd.SecurityOption, _ ...pd.ClientOption) (pd.Client, error) {
return nil, errors.New("mock error")
}

spec, err := splitForOneSubtask(ctx, store, "test-group", kvMeta, 123)
require.NoError(t, err)

require.Len(t, spec, 1)
writeSpec := spec[0].(*WriteIngestSpec)
require.Equal(t, "test-group", writeSpec.KVGroup)
require.Equal(t, [][]byte{
[]byte("00000"), []byte("00096"), []byte("00139\x00"),
}, writeSpec.RangeSplitKeys)
}
7 changes: 4 additions & 3 deletions pkg/lightning/backend/external/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,15 @@ func (r *RangeSplitter) Close() error {
// but it will be nil when the group is the last one. `dataFiles` and `statFiles`
// are all the files that have overlapping key ranges in this group.
// `interiorRangeJobKeys` are the interior boundary keys of the range jobs, the
// range can be constructed with start/end key at caller. `regionSplitKeys` are
// the split keys that will be used later to split regions.
// range can be constructed with start/end key at caller.
// `interiorRegionSplitKeys` are the split keys that will be used later to split
// regions.
func (r *RangeSplitter) SplitOneRangesGroup() (
endKeyOfGroup []byte,
dataFiles []string,
statFiles []string,
interiorRangeJobKeys [][]byte,
regionSplitKeys [][]byte,
interiorRegionSplitKeys [][]byte,
err error,
) {
var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ func (local *Backend) ImportEngine(

log.FromContext(ctx).Info("start import engine",
zap.Stringer("uuid", engineUUID),
zap.Int("region ranges", len(splitKeys)),
zap.Int("region ranges", len(splitKeys)-1),
zap.Int64("count", lfLength),
zap.Int64("size", lfTotalSize))

Expand Down
3 changes: 3 additions & 0 deletions pkg/lightning/common/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Engine interface {
// keys that can be used as region split keys. If the duplicate detection is
// enabled, the keys stored in engine are encoded by duplicate detection but the
// returned keys should not be encoded.
//
// Currently, the start/end key of this import should also be included in the
// returned split keys.
GetRegionSplitKeys() ([][]byte, error)
Close() error
}

0 comments on commit d13e52e

Please sign in to comment.