Skip to content

Commit ce060d3

Browse files
committed
Merge remote-tracking branch 'upstream/main' into mhamza/pool-reuse
Signed-off-by: Mohamed Hamza <[email protected]>
2 parents 9d81683 + 0e0f4da commit ce060d3

File tree

23 files changed

+389
-22
lines changed

23 files changed

+389
-22
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ $(PROTO_GO_OUTS): minimaltools install_protoc-gen-go proto/*.proto
286286
# This rule builds the bootstrap images for all flavors.
287287
DOCKER_IMAGES_FOR_TEST = mysql80 mysql84 percona80
288288
DOCKER_IMAGES = common $(DOCKER_IMAGES_FOR_TEST)
289-
BOOTSTRAP_VERSION=44
289+
BOOTSTRAP_VERSION=45
290290
ensure_bootstrap_version:
291291
find docker/ -type f -exec sed -i "s/^\(ARG bootstrap_version\)=.*/\1=${BOOTSTRAP_VERSION}/" {} \;
292292
sed -i 's/\(^.*flag.String(\"bootstrap-version\",\) *\"[^\"]\+\"/\1 \"${BOOTSTRAP_VERSION}\"/' test.go

build.env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
source ./tools/shell_functions.inc
1818

1919
go version >/dev/null 2>&1 || fail "Go is not installed or is not in \$PATH. See https://vitess.io/contributing/build-from-source for install instructions."
20-
goversion_min 1.24.2 || echo "Go version reported: `go version`. Version 1.24.2+ recommended. See https://vitess.io/contributing/build-from-source for install instructions."
20+
goversion_min 1.24.3 || echo "Go version reported: `go version`. Version 1.24.3+ recommended. See https://vitess.io/contributing/build-from-source for install instructions."
2121

2222
mkdir -p dist
2323
mkdir -p bin

docker/bootstrap/CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,4 +174,8 @@ List of changes between bootstrap image versions.
174174

175175
## [44] - 2025-04-02
176176
### Changes
177-
- Update build to golang 1.24.2
177+
- Update build to golang 1.24.2
178+
179+
## [45] - 2025-05-07
180+
### Changes
181+
- Update build to golang 1.24.3

docker/bootstrap/Dockerfile.common

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM --platform=linux/amd64 golang:1.24.2-bookworm
1+
FROM --platform=linux/amd64 golang:1.24.3-bookworm
22

33
# Install Vitess build dependencies
44
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \

docker/lite/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
FROM --platform=linux/amd64 golang:1.24.2-bookworm AS builder
15+
FROM --platform=linux/amd64 golang:1.24.3-bookworm AS builder
1616

1717
# Allows docker builds to set the BUILD_NUMBER
1818
ARG BUILD_NUMBER

docker/lite/Dockerfile.mysql84

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
FROM --platform=linux/amd64 golang:1.24.2-bookworm AS builder
15+
FROM --platform=linux/amd64 golang:1.24.3-bookworm AS builder
1616

1717
# Allows docker builds to set the BUILD_NUMBER
1818
ARG BUILD_NUMBER

docker/lite/Dockerfile.percona80

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
FROM --platform=linux/amd64 golang:1.24.2-bookworm AS builder
15+
FROM --platform=linux/amd64 golang:1.24.3-bookworm AS builder
1616

1717
# Allows docker builds to set the BUILD_NUMBER
1818
ARG BUILD_NUMBER

docker/vttestserver/Dockerfile.mysql80

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
FROM --platform=linux/amd64 golang:1.24.2-bookworm AS builder
15+
FROM --platform=linux/amd64 golang:1.24.3-bookworm AS builder
1616

1717
# Allows docker builds to set the BUILD_NUMBER
1818
ARG BUILD_NUMBER

docker/vttestserver/Dockerfile.mysql84

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
FROM --platform=linux/amd64 golang:1.24.2-bookworm AS builder
15+
FROM --platform=linux/amd64 golang:1.24.3-bookworm AS builder
1616

1717
# Allows docker builds to set the BUILD_NUMBER
1818
ARG BUILD_NUMBER

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module vitess.io/vitess
22

3-
go 1.24.2
3+
go 1.24.3
44

55
require (
66
cloud.google.com/go/storage v1.52.0

go/test/endtoend/vreplication/vstream_test.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ import (
2626
"testing"
2727
"time"
2828

29+
"github.com/stretchr/testify/assert"
2930
"github.com/stretchr/testify/require"
3031

32+
"vitess.io/vitess/go/sets"
3133
"vitess.io/vitess/go/vt/log"
3234
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
3335
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
@@ -37,6 +39,166 @@ import (
3739
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
3840
)
3941

42+
func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
43+
vc = NewVitessCluster(t, nil)
44+
defer vc.TearDown()
45+
46+
require.NotNil(t, vc)
47+
defaultReplicas = 2
48+
defaultRdonly = 0
49+
50+
defaultCell := vc.Cells[vc.CellNames[0]]
51+
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
52+
verifyClusterHealth(t, vc)
53+
54+
ctx := context.Background()
55+
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
56+
if err != nil {
57+
log.Fatal(err)
58+
}
59+
defer vstreamConn.Close()
60+
vgtid := &binlogdatapb.VGtid{
61+
ShardGtids: []*binlogdatapb.ShardGtid{{
62+
Keyspace: "product",
63+
Shard: "0",
64+
Gtid: "",
65+
}}}
66+
67+
filter := &binlogdatapb.Filter{
68+
Rules: []*binlogdatapb.Rule{
69+
{
70+
Match: "customer",
71+
Filter: "select * from customer",
72+
}, {
73+
Match: "product",
74+
Filter: "select * from product",
75+
}, {
76+
Match: "merchant",
77+
Filter: "select * from merchant",
78+
},
79+
},
80+
}
81+
flags := &vtgatepb.VStreamFlags{
82+
TablesToCopy: []string{"product", "customer"},
83+
}
84+
id := 0
85+
vtgateConn := vc.GetVTGateConn(t)
86+
defer vtgateConn.Close()
87+
88+
// To test the copy phase, let's insert 10 rows intitally in each table
89+
// present in the filter before running the VStream.
90+
for range 10 {
91+
id++
92+
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
93+
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
94+
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
95+
}
96+
97+
// Stream events from the VStream API
98+
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
99+
require.NoError(t, err)
100+
var numRowEvents int64
101+
102+
copyPhaseCompleted := atomic.Bool{}
103+
copyPhaseCompleted.Store(false)
104+
105+
done := atomic.Bool{}
106+
done.Store(false)
107+
108+
copiedTables := make(sets.Set[string])
109+
// Start reading events from the VStream.
110+
go func() {
111+
for {
112+
evs, err := reader.Recv()
113+
switch err {
114+
case nil:
115+
for _, ev := range evs {
116+
if ev.Type == binlogdatapb.VEventType_ROW {
117+
if !copyPhaseCompleted.Load() {
118+
escapedTableNameParts := strings.Split(ev.RowEvent.TableName, ".")
119+
require.Len(t, escapedTableNameParts, 2)
120+
copiedTables.Insert(escapedTableNameParts[1])
121+
}
122+
numRowEvents++
123+
}
124+
if ev.Type == binlogdatapb.VEventType_COPY_COMPLETED {
125+
copyPhaseCompleted.Store(true)
126+
}
127+
}
128+
case io.EOF:
129+
log.Infof("Stream Ended")
130+
default:
131+
log.Infof("%s:: remote error: %v", time.Now(), err)
132+
}
133+
134+
if done.Load() {
135+
return
136+
}
137+
}
138+
}()
139+
140+
// Wait for copy phase to complete.
141+
ticker := time.NewTicker(100 * time.Millisecond)
142+
for {
143+
<-ticker.C
144+
if copyPhaseCompleted.Load() {
145+
break
146+
}
147+
}
148+
149+
stopInserting := atomic.Bool{}
150+
stopInserting.Store(false)
151+
var insertMu sync.Mutex
152+
go func() {
153+
for {
154+
if stopInserting.Load() {
155+
return
156+
}
157+
insertMu.Lock()
158+
id++
159+
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
160+
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
161+
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
162+
insertMu.Unlock()
163+
}
164+
}()
165+
166+
time.Sleep(100 * time.Millisecond)
167+
stopInserting.Store(true)
168+
time.Sleep(10 * time.Second) // Give the vstream plenty of time to catchup
169+
done.Store(true)
170+
171+
qr1 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from customer")
172+
qr2 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from product")
173+
qr3 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from merchant")
174+
require.NotNil(t, qr1)
175+
require.NotNil(t, qr2)
176+
require.NotNil(t, qr3)
177+
178+
// Total number of rows.
179+
insertedRows1, err := qr1.Rows[0][0].ToCastInt64()
180+
require.NoError(t, err)
181+
require.NotZero(t, insertedRows1)
182+
insertedRows2, err := qr2.Rows[0][0].ToCastInt64()
183+
require.NoError(t, err)
184+
require.NotZero(t, insertedRows2)
185+
insertedRows3, err := qr3.Rows[0][0].ToCastInt64()
186+
require.NoError(t, err)
187+
require.NotZero(t, insertedRows3)
188+
189+
assert.Len(t, copiedTables, 2)
190+
for _, expectedCopiedTableName := range flags.TablesToCopy {
191+
assert.Truef(t, copiedTables.Has(expectedCopiedTableName), "expected table %s to be copied", expectedCopiedTableName)
192+
}
193+
// We don't expect merchant table to be part of copy phase.
194+
assert.False(t, copiedTables.Has("merchant"), "expected table merchant not to be copied")
195+
196+
// Since we don't expect merchant table to be part of copy phase, we can
197+
// subtract 10 from the total rows found in the 3 tables.
198+
wantTotalRows := insertedRows1 + insertedRows2 + insertedRows3 - 10
199+
assert.Equal(t, wantTotalRows, numRowEvents)
200+
}
201+
40202
// Validates that we have a working VStream API
41203
// If Failover is enabled:
42204
// - We ensure that this works through active reparents and doesn't miss any events

go/vt/proto/binlogdata/binlogdata.pb.go

Lines changed: 15 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/vt/proto/binlogdata/binlogdata_vtproto.pb.go

Lines changed: 52 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)