Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/ts 5436 3.0 acore rpc #29233

Open
wants to merge 73 commits into
base: 3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
6450116
feat: support run taos/taosd in single process
kailixu Sep 27, 2024
62764b9
Merge branch '3.0' into feat/TS-5436-3.0
kailixu Sep 27, 2024
2069716
feat: support run taos/taosd in single process
kailixu Sep 29, 2024
fca2c87
Merge branch '3.0' into feat/TS-5436-3.0
kailixu Sep 29, 2024
b51c8f3
feat: support run taos/taosd in single process
kailixu Sep 29, 2024
155200f
feat: support run taos/taosd in single process
kailixu Sep 29, 2024
cd77254
enh: code optimization
kailixu Sep 29, 2024
88d82a5
feat: support run taos/taosd in single process
kailixu Sep 29, 2024
33e5dae
feat: support run taos/taosd in single process
kailixu Sep 29, 2024
ae82fcb
Merge branch '3.0' into feat/TS-5436-3.0
kailixu Sep 29, 2024
32ff35d
feat: support run taos/taosd in single process
kailixu Sep 29, 2024
684f1a0
fix: run taos shell with a file
kailixu Sep 29, 2024
44f78f1
Merge branch '3.0' into feat/TS-5436-3.0
kailixu Sep 29, 2024
3607f81
fix: typo of shell engine
kailixu Sep 29, 2024
bb75004
enh: code optimization
kailixu Sep 29, 2024
805c1e0
add rpc demo
yihaoDeng Oct 17, 2024
4a28bb0
add rpc demo
yihaoDeng Oct 17, 2024
5357fd8
add rpc demo
yihaoDeng Oct 17, 2024
dc8e5e4
add rpc demo
yihaoDeng Oct 17, 2024
fd7d94a
add rpc demo
yihaoDeng Oct 17, 2024
14dddc6
add rpc demo
yihaoDeng Oct 17, 2024
954754c
add rpc demo
yihaoDeng Oct 18, 2024
03f39e3
add rpc demo
yihaoDeng Oct 18, 2024
4637496
add rpc demo
yihaoDeng Oct 21, 2024
737e50c
chore: merge 3.0
kailixu Dec 6, 2024
938fae1
support acore system
yihaoDeng Dec 12, 2024
e2f9423
refactor code
yihaoDeng Dec 18, 2024
f9502f9
refactor code
yihaoDeng Dec 18, 2024
3ec8f60
refactor code
yihaoDeng Dec 18, 2024
80a0546
refactor code
yihaoDeng Dec 18, 2024
4546dbe
refactor code
yihaoDeng Dec 18, 2024
9ff8faa
refactor code
yihaoDeng Dec 18, 2024
963194b
refactor code
yihaoDeng Dec 18, 2024
f0ee1bc
refactor code
yihaoDeng Dec 18, 2024
ddb31c5
refactor code
yihaoDeng Dec 18, 2024
5776a8b
refactor code
yihaoDeng Dec 18, 2024
26d3039
refactor code
yihaoDeng Dec 19, 2024
1b3a68e
refactor code
yihaoDeng Dec 19, 2024
6efc258
refactor code
yihaoDeng Dec 19, 2024
9b36322
refactor code
yihaoDeng Dec 19, 2024
696f40f
refactor code
yihaoDeng Dec 19, 2024
d5666fa
refactor code
yihaoDeng Dec 19, 2024
85e9b7d
refactor code
yihaoDeng Dec 20, 2024
77ada93
refactor code
yihaoDeng Dec 20, 2024
3981b16
support select
yihaoDeng Dec 20, 2024
f92cf74
support select
yihaoDeng Dec 20, 2024
71a4ae0
support select
yihaoDeng Dec 20, 2024
8527bfc
support select
yihaoDeng Dec 20, 2024
a64d13f
support select
yihaoDeng Dec 20, 2024
aa814ea
support select
yihaoDeng Dec 21, 2024
0f94ca2
support select
yihaoDeng Dec 21, 2024
9b367a1
support select
yihaoDeng Dec 22, 2024
a273f57
support select
yihaoDeng Dec 23, 2024
3f93cd6
support select
yihaoDeng Dec 23, 2024
ce229f7
support select
yihaoDeng Dec 23, 2024
b88940b
support select
yihaoDeng Dec 23, 2024
67796de
support select
yihaoDeng Dec 23, 2024
fc549f9
support select
yihaoDeng Dec 24, 2024
e086812
support select
yihaoDeng Dec 25, 2024
1041232
support select
yihaoDeng Dec 26, 2024
184731c
refactor code
yihaoDeng Dec 27, 2024
8be009a
refactor code
yihaoDeng Dec 27, 2024
65e3a99
refactor code
yihaoDeng Dec 27, 2024
23313f7
refactor code
yihaoDeng Dec 27, 2024
8d190b2
refactor code
yihaoDeng Dec 30, 2024
ff034b0
refactor code
yihaoDeng Dec 30, 2024
4cb784c
refactor code
yihaoDeng Dec 31, 2024
b3971fb
refactor code
yihaoDeng Dec 31, 2024
6ca5dcc
refactor code
yihaoDeng Dec 31, 2024
b2411b9
refactor code
yihaoDeng Dec 31, 2024
0059728
refactor code
yihaoDeng Dec 31, 2024
eb52ae3
refactor code
yihaoDeng Dec 31, 2024
30dffc7
refactor code
yihaoDeng Dec 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmake/cmake.platform
Original file line number Diff line number Diff line change
Expand Up @@ -218,5 +218,9 @@ else ()
endif()
MESSAGE(STATUS "DEPS_DIR: " ${TD_DEPS_DIR})

if(${TD_ACORE})
add_definitions(-DTD_ACORE)
endif(${TD_ACORE})

MESSAGE("C Compiler: ${CMAKE_C_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_C_COMPILER_VERSION})")
MESSAGE("CXX Compiler: ${CMAKE_CXX_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_CXX_COMPILER_VERSION})")
10 changes: 7 additions & 3 deletions include/common/tglobal.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ extern bool tsEnableWhiteList;
extern int64_t tsDndStart;
extern int64_t tsDndStartOsUptime;
extern int64_t tsDndUpTime;
extern int64_t tsDndStarted;

// dnode misc
extern uint32_t tsEncryptionKeyChksum;
Expand Down Expand Up @@ -243,6 +244,9 @@ extern int64_t tsmaDataDeleteMark;
// wal
extern int64_t tsWalFsyncDataSizeLimit;

// misc
extern bool tsAcoreOS;

// internal
extern int32_t tsTransPullupInterval;
extern int32_t tsCompactPullupInterval;
Expand Down Expand Up @@ -272,9 +276,9 @@ extern bool tsExperimental;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl,
SArray *pArgs);
const char *envFile, char *apolloUrl, SArray *pArgs, ELogMode mode);
int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs);
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
bool tsc);
void taosCleanupCfg();
Expand Down
188 changes: 188 additions & 0 deletions include/libs/transport/trpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
extern "C" {
#endif

#ifndef TD_ACORE
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
Expand Down Expand Up @@ -190,6 +191,193 @@ int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
int32_t rpcUtilSWhiteListToStr(SIpWhiteList *pWhiteList, char **ppBuf);
int32_t rpcCvtErrCode(int32_t code);

#else
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#include "tmsg.h"
#include "ttrace.h"

#define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1
#define IsReq(pMsg) (pMsg->msgType & 1U)

extern int32_t tsRpcHeadSize;

typedef struct {
uint32_t clientIp;
uint16_t clientPort;
int64_t applyIndex;
uint64_t applyTerm;
char user[TSDB_USER_LEN];
} SRpcConnInfo;

typedef enum {
TD_ACORE_CLIENT = 1,
TD_ACORE_DSVR_CLIENT = 2,
TD_ACORE_DSVR_STA_CLIENT = 4,
TD_ACORE_DSVR_SYNC_CLIENT = 8,
TD_ACORE_DSVR = 16
} RPC_TYPE;

typedef struct SRpcHandleInfo {
// rpc info
void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server
int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp)
int8_t persistHandle; // persist handle or not
int8_t hasEpSet;
int32_t cliVer;

// app info
void *ahandle; // app handle set by client
void *wrapper; // wrapper handle
void *node; // node mgmt handle

// resp info
void *rsp;
int32_t rspLen;

STraceId traceId;

SRpcConnInfo conn;
int8_t forbiddenIp;
int8_t notFreeAhandle;
int8_t compressed;
int16_t connType;
int64_t seq;
int64_t qId;
int32_t msgType;
void *reqWithSem;
int32_t refIdMgt;
} SRpcHandleInfo;

typedef struct SRpcMsg {
tmsg_t msgType;
void *pCont;
int32_t contLen;
int32_t code;
int32_t type;
void *parent;
SRpcHandleInfo info;

} SRpcMsg;

typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcFFfp)(tmsg_t msgType);
typedef bool (*RpcNoDelayfp)(tmsg_t msgType);
typedef void (*RpcDfp)(void *ahandle);

typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port
char *label; // for debug purpose
int32_t numOfThreads; // number of threads to handle connections
int32_t sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int32_t idleTime; // milliseconds, 0 means idle timer is disabled
int32_t compatibilityVer;

int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval
int64_t retryMaxTimeout;

int32_t failFastThreshold;
int32_t failFastInterval;

int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not

// the following is for client app ecurity only
char *user; // user name

// call back to process incoming msg
RpcCfp cfp;

// retry not not for particular msg
RpcRfp rfp;

// set up timeout for particular msg
RpcTfp tfp;

// destroy client ahandle;
RpcDfp dfp;
// fail fast fp
RpcFFfp ffp;

RpcNoDelayfp noDelayFp;

int32_t connLimitNum;
int32_t connLimitLock;
int32_t timeToGetConn;
int8_t supportBatch; // 0: no batch, 1. batch
int32_t batchSize;
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
int32_t shareConnLimit;
int8_t shareConn; // 0: no share, 1. share
int8_t startReadTimer;
int64_t readTimeout; // s
void *parent;

} SRpcInit;

typedef struct {
void *val;
int32_t (*clone)(void *src, void **dst);
} SRpcCtxVal;

typedef struct {
int32_t msgType;
void *val;
int32_t (*clone)(void *src, void **dst);
} SRpcBrokenlinkVal;

typedef struct {
SHashObj *args;
SRpcBrokenlinkVal brokenVal;
void (*freeFunc)(const void *arg);
int64_t st;
} SRpcCtx;

int32_t rpcInit();
void rpcCleanup();

void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void rpcCloseImpl(void *);
void *rpcMallocCont(int64_t contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int64_t contLen);

// Because taosd supports multi-process mode
// These functions should not be used on the server side
// Please use tmsg<xx> functions, which are defined in tmsgcb.h
int32_t rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
int32_t rpcSendResponse(SRpcMsg *pMsg);
int32_t rpcRegisterBrokenLinkArg(SRpcMsg *msg);
int32_t rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock

// These functions will not be called in the child process
int32_t rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int32_t rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int32_t rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated,
int32_t timeoutMs);

int32_t rpcFreeConnById(void *shandle, int64_t connId);

int32_t rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
int32_t rpcAllocHandle(int64_t *refId);
int32_t rpcSetIpWhite(void *thandl, void *arg);

int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);

int32_t rpcUtilSWhiteListToStr(SIpWhiteList *pWhiteList, char **ppBuf);
int32_t rpcCvtErrCode(int32_t code);

#endif
#ifdef __cplusplus
}
#endif
Expand Down
22 changes: 11 additions & 11 deletions include/os/osSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@

// If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following section.
#ifndef ALLOW_FORBID_FUNC
#define socket SOCKET_FUNC_TAOS_FORBID
#define bind BIND_FUNC_TAOS_FORBID
#define listen LISTEN_FUNC_TAOS_FORBID
#define accept ACCEPT_FUNC_TAOS_FORBID
#define epoll_create EPOLL_CREATE_FUNC_TAOS_FORBID
#define epoll_ctl EPOLL_CTL_FUNC_TAOS_FORBID
#define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID
#define inet_addr INET_ADDR_FUNC_TAOS_FORBID
#define inet_ntoa INET_NTOA_FUNC_TAOS_FORBID
#endif
// #ifndef ALLOW_FORBID_FUNC
// #define socket SOCKET_FUNC_TAOS_FORBID
// #define bind BIND_FUNC_TAOS_FORBID
// #define listen LISTEN_FUNC_TAOS_FORBID
// #define accept ACCEPT_FUNC_TAOS_FORBID
// #define epoll_create EPOLL_CREATE_FUNC_TAOS_FORBID
// #define epoll_ctl EPOLL_CTL_FUNC_TAOS_FORBID
// #define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID
// #define inet_addr INET_ADDR_FUNC_TAOS_FORBID
// #define inet_ntoa INET_NTOA_FUNC_TAOS_FORBID
// #endif

#if defined(WINDOWS)
#if BYTE_ORDER == LITTLE_ENDIAN
Expand Down
8 changes: 7 additions & 1 deletion include/util/tlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ typedef enum {
DEBUG_FILE = 128
} ELogLevel;

typedef enum {
LOG_MODE_TAOSC = 1,
LOG_MODE_TAOSD = 2,
LOG_MODE_BOTH = 3, // LOG_MODE_TAOSC | LOG_MODE_TAOSD
} ELogMode;

typedef void (*LogFp)(int64_t ts, ELogLevel level, const char *content);

extern bool tsLogEmbedded;
Expand Down Expand Up @@ -73,7 +79,7 @@ extern int32_t simDebugFlag;

extern int32_t tqClientDebugFlag;
int32_t taosInitLogOutput(const char **ppLogName);
int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc);
int32_t taosInitLog(const char *logName, int32_t maxFiles, ELogMode mode);
void taosCloseLog();
void taosResetLog();
void taosDumpData(uint8_t *msg, int32_t len);
Expand Down
4 changes: 2 additions & 2 deletions source/client/src/clientEnv.c
Original file line number Diff line number Diff line change
Expand Up @@ -960,13 +960,13 @@ void taos_init_imp(void) {

const char *logName = CUS_PROMPT "slog";
ENV_ERR_RET(taosInitLogOutput(&logName), "failed to init log output");
if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
if (taosCreateLog(logName, 10, configDir, NULL, NULL, NULL, NULL, tsAcoreOS ? LOG_MODE_BOTH : LOG_MODE_TAOSC) != 0) {
(void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logName, strerror(errno), configDir);
tscInitRes = -1;
return;
}

ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, tsAcoreOS ? 0 : 1), "failed to init cfg");

initQueryModuleMsgHandle();
ENV_ERR_RET(taosConvInit(), "failed to init conv");
Expand Down
2 changes: 1 addition & 1 deletion source/client/src/clientMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void taos_cleanup(void) {

tscInfo("all local resources released");
taosCleanupCfg();
taosCloseLog();
if (!tsAcoreOS) taosCloseLog();
}

static setConfRet taos_set_config_imp(const char *config) {
Expand Down
28 changes: 16 additions & 12 deletions source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ int32_t tsArbSetAssignedTimeoutSec = 30;
int64_t tsDndStart = 0;
int64_t tsDndStartOsUptime = 0;
int64_t tsDndUpTime = 0;
int64_t tsDndStarted = 0;

// dnode misc
uint32_t tsEncryptionKeyChksum = 0;
Expand Down Expand Up @@ -278,6 +279,9 @@ int32_t tsTtlFlushThreshold = 100; /* maximum number of dirty items in memory.
*/
int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch

// misc
bool tsAcoreOS = false;

// internal
int32_t tsTransPullupInterval = 2;
int32_t tsCompactPullupInterval = 10;
Expand Down Expand Up @@ -1731,24 +1735,26 @@ int32_t taosSetReleaseCfg(SConfig *pCfg);

static int32_t taosSetAllDebugFlag(SConfig *pCfg, int32_t flag);

static int8_t tsLogCreated = 0;

int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
const char *envFile, char *apolloUrl, SArray *pArgs, ELogMode mode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SConfig *pCfg = NULL;

if (atomic_val_compare_exchange_8(&tsLogCreated, 0, 1) != 0) return 0;

if (tsCfg == NULL) {
TAOS_CHECK_GOTO(osDefaultInit(), &lino, _exit);
}

TAOS_CHECK_GOTO(cfgInit(&pCfg), &lino, _exit);

if (tsc) {
tsLogEmbedded = 0;
TAOS_CHECK_GOTO(taosAddClientLogCfg(pCfg), &lino, _exit);
} else {
tsLogEmbedded = 1;
TAOS_CHECK_GOTO(taosAddClientLogCfg(pCfg), &lino, _exit);
tsLogEmbedded = (mode & LOG_MODE_TAOSC) ? 0 : 1;
TAOS_CHECK_GOTO(taosAddClientLogCfg(pCfg), &lino, _exit);

if (mode & LOG_MODE_TAOSD) {
TAOS_CHECK_GOTO(taosAddServerLogCfg(pCfg), &lino, _exit);
}

Expand All @@ -1762,10 +1768,8 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
goto _exit;
}

if (tsc) {
TAOS_CHECK_GOTO(taosSetClientLogCfg(pCfg), &lino, _exit);
} else {
TAOS_CHECK_GOTO(taosSetClientLogCfg(pCfg), &lino, _exit);
TAOS_CHECK_GOTO(taosSetClientLogCfg(pCfg), &lino, _exit);
if (mode & LOG_MODE_TAOSD) {
TAOS_CHECK_GOTO(taosSetServerLogCfg(pCfg), &lino, _exit);
}

Expand All @@ -1778,7 +1782,7 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
goto _exit;
}

if ((code = taosInitLog(logname, logFileNum, tsc)) != 0) {
if ((code = taosInitLog(logname, logFileNum, mode)) != 0) {
(void)printf("failed to init log file since %s\n", tstrerror(code));
goto _exit;
}
Expand Down
Loading