Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): check if the altered tag value table #29380

Merged
merged 2 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions source/dnode/vnode/src/vnd/vnodeSvr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,8 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
SVAlterTbReq vAlterTbReq = {0};
SVAlterTbRsp vAlterTbRsp = {0};
SDecoder dc = {0};
int32_t rcode = 0;
int32_t code = 0;
int32_t lino = 0;
int32_t ret;
SEncoder ec = {0};
STableMetaRsp vMetaRsp = {0};
Expand All @@ -1431,15 +1432,13 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
tDecoderClear(&dc);
rcode = -1;
goto _exit;
}

// process
if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) {
vAlterTbRsp.code = terrno;
tDecoderClear(&dc);
rcode = -1;
goto _exit;
}
tDecoderClear(&dc);
Expand All @@ -1449,6 +1448,31 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
vAlterTbRsp.pMeta = &vMetaRsp;
}

if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL || vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, vAlterTbReq.tbName);
if (uid == 0) {
vError("vgId:%d, %s failed at %s:%d since table %s not found", TD_VID(pVnode), __func__, __FILE__, __LINE__,
vAlterTbReq.tbName);
goto _exit;
}

SArray* tbUids = taosArrayInit(4, sizeof(int64_t));
void* p = taosArrayPush(tbUids, &uid);
TSDB_CHECK_NULL(p, code, lino, _exit, terrno);

vDebug("vgId:%d, remove tags value altered table:%s from query table list", TD_VID(pVnode), vAlterTbReq.tbName);
if ((code = tqUpdateTbUidList(pVnode->pTq, tbUids, false)) < 0) {
vError("vgId:%d, failed to remove tbUid list since %s", TD_VID(pVnode), tstrerror(code));
}

vDebug("vgId:%d, try to add table:%s in query table list", TD_VID(pVnode), vAlterTbReq.tbName);
if ((code = tqUpdateTbUidList(pVnode->pTq, tbUids, true)) < 0) {
vError("vgId:%d, failed to add tbUid list since %s", TD_VID(pVnode), tstrerror(code));
}

taosArrayDestroy(tbUids);
}

_exit:
taosArrayDestroy(vAlterTbReq.pMultiTag);
tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
Expand All @@ -1457,6 +1481,7 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
if (tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp) != 0) {
vError("vgId:%d, failed to encode alter table response", TD_VID(pVnode));
}

tEncoderClear(&ec);
if (vMetaRsp.pSchemas) {
taosMemoryFree(vMetaRsp.pSchemas);
Expand Down
2 changes: 2 additions & 0 deletions tests/parallel_test/cases.task
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,8 @@
,,y,script,./test.sh -f tsim/stream/basic2.sim
,,y,script,./test.sh -f tsim/stream/basic3.sim
,,y,script,./test.sh -f tsim/stream/basic4.sim
,,y,script,./test.sh -f tsim/stream/basic5.sim
,,y,script,./test.sh -f tsim/stream/tag.sim
,,y,script,./test.sh -f tsim/stream/snodeCheck.sim
,,y,script,./test.sh -f tsim/stream/concurrentcheckpt.sim
,,y,script,./test.sh -f tsim/stream/checkpointInterval0.sim
Expand Down
110 changes: 110 additions & 0 deletions tests/script/tsim/stream/tag.sim
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start

sleep 100
sql connect

print step1
print =============== create database
sql create database test vgroups 2;
sql use test;

sql create table st1(ts timestamp, a int, b int , c int, d double) tags(x int);
sql create table t1 using st1 tags(1);
sql create table t2 using st1 tags(2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 WATERMARK 100s into streamt as select _wstart as s, count(*) c1 from st1 where x>=2 interval(60s) ;

run tsim/stream/checkTaskStatus.sim


sql insert into t2 values(1648791213000,0,1,1,1.0);
sql insert into t2 values(1648791213001,9,2,2,1.1);
sql insert into t2 values(1648791213009,0,3,3,1.0);

sql insert into t1 values(1648791223000,0,1,1,1.0);
sql insert into t1 values(1648791223001,9,2,2,1.1);
sql insert into t1 values(1648791223009,0,3,3,1.0);

sleep 300

sql select * from streamt;
if $data01 != 3 then
return -1
endi

sql alter table t1 set tag x=3;

sql insert into t1 values(1648791233000,0,1,1,1.0);
sql insert into t1 values(1648791233001,9,2,2,1.1);
sql insert into t1 values(1648791233009,0,3,3,1.0);

sleep 1000
sql select * from streamt;

if $data01 != 6 then
return -1
endi

sql alter table t1 set tag x=1;
sql alter table t2 set tag x=1;

sql insert into t1 values(1648791243000,0,1,1,1.0);
sql insert into t1 values(1648791243001,9,2,2,1.1);


sql select * from streamt;
if $data01 != 6 then
return -1
endi

#$loop_count = 0
#loop2:
#
#sleep 300
#print 1 sql select * from streamt;
#sql select * from streamt;
#
#print $data00 $data01 $data02 $data03
#print $data10 $data11 $data12 $data13
#
#$loop_count = $loop_count + 1
#if $loop_count == 10 then
# return -1
#endi
#
## row 0
#if $data01 != 3 then
# print ======data01=$data01
# goto loop2
#endi
#
#if $data02 != 6 then
# print ======data02=$data02
# goto loop2
#endi
#
#if $data03 != 3 then
# print ======data03=$data03
# goto loop2
#endi
#
## row 1
#if $data11 != 3 then
# print ======data11=$data11
# goto loop2
#endi
#
#if $data12 != 6 then
# print ======data12=$data12
# goto loop2
#endi
#
#if $data13 != 3 then
# print ======data13=$data13
# goto loop2
#endi
#

print tag end
system sh/exec.sh -n dnode1 -s stop -x SIGINT
4 changes: 4 additions & 0 deletions tests/script/tsim/testsuit.sim
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ run tsim/stream/distributeInterval0.sim
run tsim/stream/distributeSession0.sim
run tsim/stream/state0.sim
run tsim/stream/basic2.sim
run tsim/stream/basic3.sim
run tsim/stream/basic4.sim
run tsim/stream/basic5.sim
run tsim/stream/tag.sim
run tsim/stream/concurrentcheckpt.sim
run tsim/insert/basic1.sim
run tsim/insert/commit-merge0.sim
Expand Down
Loading