第六天:网络处理(anet部分)-- redis源码慢慢学,慢慢看【redis6.0.6】

时间:2022-07-24
本文章向大家介绍第六天:网络处理(anet部分)-- redis源码慢慢学,慢慢看【redis6.0.6】,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

前奏

今天,我找回了阔别已久的迷路的感觉,我觉得,顺序什么的,算了算了,还是按照我的喜好来。 所以,这两天我决定学习网络模块。

网络模块的文件:

依旧要房山管理放上人家的版权:

  • Copyright © 2006-2012, Salvatore Sanfilippo
  • All rights reserved.

初潮

ane.c和anet.h中的内容主要用于简单的TCP Socket连接。

先来看一下头文件,一半是熟悉的面孔,一半听我说完也是熟悉的面孔了嘿嘿

#ifndef ANET_H
#define ANET_H

#include <sys/types.h>

#define ANET_OK 0
#define ANET_ERR -1
#define ANET_ERR_LEN 256

/* Flags used with certain functions. */
#define ANET_NONE 0
#define ANET_IP_ONLY (1<<0)

#if defined(__sun) || defined(_AIX)
#define AF_LOCAL AF_UNIX
#endif

#ifdef _AIX
#undef ip_len
#endif

int anetTcpConnect(char *err, const char *addr, int port);	//TCP的默认连接
int anetTcpNonBlockConnect(char *err, const char *addr, int port);	//TCP的非阻塞连接
int anetTcpNonBlockBindConnect(char *err, const char *addr, int port, const char *source_addr);	//TCP的非阻塞绑定
int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port, const char *source_addr);
//字面意思啊:TCP的最高效非阻塞绑定,等下我倒要看看到底是何方神圣,虽然我心里有个答案了

//下面俩是unix的
int anetUnixConnect(char *err, const char *path);
int anetUnixNonBlockConnect(char *err, const char *path);

int anetRead(int fd, char *buf, int count);
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len);	//解析所有东西
int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len);//单单解析IP

int anetTcpServer(char *err, int port, char *bindaddr, int backlog);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog);
int anetUnixServer(char *err, char *path, mode_t perm, int backlog);
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
int anetUnixAccept(char *err, int serversock);

int anetWrite(int fd, char *buf, int count);
int anetNonBlock(char *err, int fd);		//anet设置非阻塞的方法 
int anetBlock(char *err, int fd);
int anetEnableTcpNoDelay(char *err, int fd);	//启用非延迟
int anetDisableTcpNoDelay(char *err, int fd);
int anetTcpKeepAlive(char *err, int fd);	//心跳保活机制

int anetSendTimeout(char *err, int fd, long long ms);
int anetRecvTimeout(char *err, int fd, long long ms);

int anetPeerToString(int fd, char *ip, size_t ip_len, int *port);	//转换网络字节序
int anetKeepAlive(char *err, int fd, int interval);
int anetSockName(int fd, char *ip, size_t ip_len, int *port);

//格式化,等下看看是何方神圣
int anetFormatAddr(char *fmt, size_t fmt_len, char *ip, int port);	
int anetFormatPeer(int fd, char *fmt, size_t fmt_len);
int anetFormatSock(int fd, char *fmt, size_t fmt_len);

高潮迭起

工具包

通过域名获取主机IP - struct addrinfo SOCKET网络编程 - 通俗易懂入门篇 setsockopt与getsockopt

anet.c

anetTcpConnect

int anetTcpConnect(char *err, const char *addr, int port)
{
    return anetTcpGenericConnect(err,addr,port,NULL,ANET_CONNECT_NONE);
}
/*
	参数释义:
		addr:传入服务器地址信息,其中内容将用于创建监听套接字
		source_addr:相当于客户端地址信息
		flags:这我还真没看明白。。。再最后一块儿,注释给出了,如果没猜错,应该是和仅打开监听套接字有关的
*/
static int anetTcpGenericConnect(char *err, const char *addr, int port,const char *source_addr, int flags)
{
    int s = ANET_ERR, rv;
    char portstr[6];  /* strlen("65535") + 1; */
    struct addrinfo hints, *servinfo, *bservinfo, *p, *b;

    snprintf(portstr,sizeof(portstr),"%d",port);
    memset(&hints,0,sizeof(hints));
    hints.ai_family = AF_UNSPEC;	//未指定
    hints.ai_socktype = SOCK_STREAM;//有序、可靠、面向连接的双向字节流

    if ((rv = getaddrinfo(addr,portstr,&hints,&servinfo)) != 0) {	//解析addr信息,存入 servinfo
    //不懂一定要看上面的工具包,写了一晚上呢
        anetSetError(err, "%s", gai_strerror(rv));	//这个不管它,报错函数而已
        return ANET_ERR;
    }
    for (p = servinfo; p != NULL; p = p->ai_next) {		//工具包里有说,给你一个addrinfo,它可能是一串
        /* Try to create the socket and to connect it.
         * If we fail in the socket() call, or on connect(), we retry with
         * the next entry in servinfo. */
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)	//这里s成为了监听套接字
            continue;	//如果没打开,那就继续循环,无碍
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;	//设置地址重用
        if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK)	//设置非阻塞
            goto error;
        if (source_addr) {	//source_addr:传入参数
            int bound = 0;
            /* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
            if ((rv = getaddrinfo(source_addr, NULL, &hints, &bservinfo)) != 0)	//内啥,不多说了啊
            {
                anetSetError(err, "%s", gai_strerror(rv));
                goto error;
            }
            for (b = bservinfo; b != NULL; b = b->ai_next) {
                if (bind(s,b->ai_addr,b->ai_addrlen) != -1) {
                    bound = 1;
                    break;
                }
            }
            freeaddrinfo(bservinfo);
            if (!bound) {
                anetSetError(err, "bind: %s", strerror(errno));
                goto error;
            }
        }	//for循环到这里结束
        if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
            /* If the socket is non-blocking, it is ok for connect() to
             * return an EINPROGRESS error here. */
            if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK)
                goto end;
            close(s);
            s = ANET_ERR;
            continue;
        }

        /* If we ended an iteration of the for loop without errors, we
         * have a connected socket. Let's return to the caller. */
        goto end;
    }
    if (p == NULL)
        anetSetError(err, "creating socket: %s", strerror(errno));

error:
    if (s != ANET_ERR) {
        close(s);
        s = ANET_ERR;
    }

end:
    freeaddrinfo(servinfo);

    /* Handle best effort binding: if a binding address was used, but it is
     * not possible to create a socket, try again without a binding address. */
    if (s == ANET_ERR && source_addr && (flags & ANET_CONNECT_BE_BINDING)) {	
    //#define ANET_CONNECT_BE_BINDING 2 /* Best effort binding. */
        return anetTcpGenericConnect(err,addr,port,NULL,flags);
    } else {
        return s;
    }
}
int anetTcpNonBlockBindConnect(char *err, const char *addr, int port,const char *source_addr)
{
    return anetTcpGenericConnect(err,addr,port,source_addr,ANET_CONNECT_NONBLOCK);
}

int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port,const char *source_addr)
{
    return anetTcpGenericConnect(err,addr,port,source_addr,ANET_CONNECT_NONBLOCK|ANET_CONNECT_BE_BINDING);
}

潮落

anetUnixConnect

int anetUnixConnect(char *err, const char *path)
{
    return anetUnixGenericConnect(err,path,ANET_CONNECT_NONE);
}
int anetUnixGenericConnect(char *err, const char *path, int flags)
{
    int s;
    struct sockaddr_un sa;

    if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR)	//在下边,简单调用而已,本地连接
        return ANET_ERR;

    sa.sun_family = AF_LOCAL;	//既然是本地连接,那就不多说了
    strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1);
    if (flags & ANET_CONNECT_NONBLOCK) {
        if (anetNonBlock(err,s) != ANET_OK) {
            close(s);
            return ANET_ERR;
        }
    }
    if (connect(s,(struct sockaddr*)&sa,sizeof(sa)) == -1) {
        if (errno == EINPROGRESS &&
            flags & ANET_CONNECT_NONBLOCK)
            return s;

        anetSetError(err, "connect: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return s;
}

static int anetCreateSocket(char *err, int domain) {
    int s;
    if ((s = socket(domain, SOCK_STREAM, 0)) == -1) {
        anetSetError(err, "creating socket: %s", strerror(errno));
        return ANET_ERR;
    }

    /* Make sure connection-intensive things like the redis benchmark
     * will be able to close/open sockets a zillion of times */
    if (anetSetReuseAddr(err,s) == ANET_ERR) {
        close(s);
        return ANET_ERR;
    }
    return s;
}
int anetUnixNonBlockConnect(char *err, const char *path)
{
    return anetUnixGenericConnect(err,path,ANET_CONNECT_NONBLOCK);
}

Read、Write

int anetRead(int fd, char *buf, int count)
{
    ssize_t nread, totlen = 0;
    while(totlen != count) {
        nread = read(fd,buf,count-totlen);
        if (nread == 0) return totlen;
        if (nread == -1) return -1;
        totlen += nread;
        buf += nread;
    }
    return totlen;
}
int anetWrite(int fd, char *buf, int count)
{
    ssize_t nwritten, totlen = 0;
    while(totlen != count) {
        nwritten = write(fd,buf,count-totlen);
        if (nwritten == 0) return totlen;
        if (nwritten == -1) return -1;
        totlen += nwritten;
        buf += nwritten;
    }
    return totlen;
}

这俩已经太熟了,也不知道要说啥

时间、心跳,相关

/* Set the socket send timeout (SO_SNDTIMEO socket option) to the specified
 * number of milliseconds, or disable it if the 'ms' argument is zero. */
int anetSendTimeout(char *err, int fd, long long ms) {
    struct timeval tv;

    tv.tv_sec = ms/1000;
    tv.tv_usec = (ms%1000)*1000;
    if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) {
        anetSetError(err, "setsockopt SO_SNDTIMEO: %s", strerror(errno));
        return ANET_ERR;
    }
    return ANET_OK;
}

/* Set the socket receive timeout (SO_RCVTIMEO socket option) to the specified
 * number of milliseconds, or disable it if the 'ms' argument is zero. */
int anetRecvTimeout(char *err, int fd, long long ms) {
    struct timeval tv;

    tv.tv_sec = ms/1000;
    tv.tv_usec = (ms%1000)*1000;
    if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
        anetSetError(err, "setsockopt SO_RCVTIMEO: %s", strerror(errno));
        return ANET_ERR;
    }
    return ANET_OK;
}

//如果对setsockopt不了解,没事儿,我在上边补了个工具包。
int anetTcpKeepAlive(char *err, int fd)
{
    int yes = 1;
    if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)) == -1) {
        anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));
        return ANET_ERR;
    }
    return ANET_OK;
}

高潮再起

anetResolve

int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len) {
    return anetGenericResolve(err,host,ipbuf,ipbuf_len,ANET_NONE);
}
int anetGenericResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len,int flags)
{
    struct addrinfo hints, *info;
    int rv;

    memset(&hints,0,sizeof(hints));
    if (flags & ANET_IP_ONLY) hints.ai_flags = AI_NUMERICHOST;
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;  /* specify socktype to avoid dups */

    if ((rv = getaddrinfo(host, NULL, &hints, &info)) != 0) {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    if (info->ai_family == AF_INET) {
        struct sockaddr_in *sa = (struct sockaddr_in *)info->ai_addr;
        inet_ntop(AF_INET, &(sa->sin_addr), ipbuf, ipbuf_len);
    } else {
        struct sockaddr_in6 *sa = (struct sockaddr_in6 *)info->ai_addr;
        inet_ntop(AF_INET6, &(sa->sin6_addr), ipbuf, ipbuf_len);
    }

    freeaddrinfo(info);
    return ANET_OK;
}

好滴吧,没有前面那个那么猛烈

int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len) {
    return anetGenericResolve(err,host,ipbuf,ipbuf_len,ANET_IP_ONLY);
}

字节序转换

int anetPeerToString(int fd, char *ip, size_t ip_len, int *port) {
    struct sockaddr_storage sa;
    socklen_t salen = sizeof(sa);

    if (getpeername(fd,(struct sockaddr*)&sa,&salen) == -1) goto error;
    if (ip_len == 0) goto error;

    if (sa.ss_family == AF_INET) {
        struct sockaddr_in *s = (struct sockaddr_in *)&sa;
        if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin_port);
    } else if (sa.ss_family == AF_INET6) {
        struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
        if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin6_port);
    } else if (sa.ss_family == AF_UNIX) {
        if (ip) strncpy(ip,"/unixsocket",ip_len);
        if (port) *port = 0;
    } else {
        goto error;
    }
    return 0;

error:
    if (ip) {
        if (ip_len >= 2) {
            ip[0] = '?';
            ip[1] = '';
        } else if (ip_len == 1) {
            ip[0] = '';
        }
    }
    if (port) *port = 0;
    return -1;
}

好累啊,那要不就到这儿吧,其他的基本是调用调用再调用,工具包里有。