Golang实现异步上传文件支持进度条查询的方法
- 作者: 亖呉?盀
- 来源: 51数据库
- 2021-07-08
业务背景
业务需求要求开发一个异步上传文件的接口,并支持上传进度的查询。
需求分析
zip压缩包中,包含一个csv文件和一个图片文件夹,要求:解析csv数据存入mongo,将图片文件夹中的图片信息对应上csv中的人员信息。
zip压缩包解压
使用golang自带的 "archive/zip" 包解压。
func decompresszip(filepath, dest string) (string, string, error) {
var csvname string
imagefolder := path.base(filepath)
ext := path.ext(filepath)
foldername := strings.trimsuffix(imagefolder, ext)
src, err := os.open(filepath)
if err != nil {
return "", "", err
}
defer src.close()
zipfile, err := zip.openreader(src.name())
if err != nil {
return "", "", err
}
defer zipfile.close()
err = os.mkdirall(path.join(dest, foldername), os.modeperm)
for _, innerfile := range zipfile.file {
info := innerfile.fileinfo()
if info.isdir() {
continue
}
dst, err := os.create(path.join(dest, foldername, info.name()))
if err != nil {
fmt.println(err.error())
continue
}
src, err := innerfile.open()
if err != nil {
fmt.println(err.error())
continue
}
io.copy(dst, src)
}
destpath, err := ioutil.readdir(path.join(dest, foldername))
if err != nil {
return "", "", err
}
for _, v := range destpath {
if path.ext(v.name()) == ".csv" {
csvname = path.join(dest, foldername, v.name())
}
}
return foldername, csvname, nil
}
在这个解压的过程中,压缩包的树结构只能到2层
import.zip ┝┅┅import.csv ┖┅┅images
在解压后,所有的文件都会在同一个目录下,既images中的图片会变成和.csv文件同级
验证csv文件编码格式是否为utf-8
func validutf8(buf []byte) bool {
nbytes := 0
for i := 0; i < len(buf); i++ {
if nbytes == 0 {
if (buf[i] & 0x80) != 0 { //与操作之后不为0,说明首位为1
for (buf[i] & 0x80) != 0 {
buf[i] <<= 1 //左移一位
nbytes++ //记录字符共占几个字节
}
if nbytes < 2 || nbytes > 6 { //因为utf8编码单字符最多不超过6个字节
return false
}
nbytes-- //减掉首字节的一个计数
}
} else { //处理多字节字符
if buf[i]&0xc0 != 0x80 { //判断多字节后面的字节是否是10开头
return false
}
nbytes--
}
}
return nbytes == 0
}
后续支持utf-8转码
这个utf8编码判断方法是网上down下来的,后续优化一下
主逻辑
type linewrong struct {
linenumber int64 `json:"line_number"`
msg string `json:"msg"`
}
func import(/*自定义参数*/){
// decompress zip file to destination address
folder, csvname, err := decompress(path.join(constant.folderprefix, req.filepath), dest)
if err != nil {
fmt.println(err.error())
}
// check if the file encoding is utf8
b, err := ioutil.readfile(csvname)
if err != nil {
fmt.println(err.error())
}
if !utils.validutf8(b) {
fmt.println(errors.new("数据编码错误,请使用utf-8格式csv!"))
}
// create goroutine to analysis data into mongodb
var wg sync.waitgroup
wg.add(1)
// used to interrupt goroutine
resultchan := make(chan error)
// used to record wrong row in csv
lw := make(chan []linewrong)
go func(ctx *gin.context, name, csvpath, dir, folder string) {
defer wg.done()
tidt, cit, lwt, err := importcsv(ctx, name, csvpath, dir, folder)
resultchan <- err
if err != nil {
fmt.println(err.error())
}
lw <- lwt
if len(lwt) == 0 {
importclassdata(ctx, tidt, cit)
}
}(ctx, req.name, csvname, dest, folder)
err = <-resultchan
linewrong := <-lw
close(lw)
···
}
// pre-analysis data in csv and through wrong data with line numbers and information
func importcsv()(){
···
}
// analysis data again and save data into mongodb, if is there any error,through them same as import()
func importclassdata()(){
···
conn, err := connect()
if err != nil {
return err
}
defer conn.close()
conn.do("hset", taskid, "task_id", (curline*100)/totallines)
···
}
将错误信息以channel接收,使用 package "sync" 的 sync.waitgroup 控制异步协程。在入库的过程中,将当前的进度存入redis。
查询进度接口
func queryimport()(){
conn, err := connect()
if err != nil {
return nil, err
}
defer conn.close()
progress, _ := conn.do("hget", key, field)
if pro, ok := progress.([]uint8); ok {
ba := []byte{}
for _, b := range pro {
ba = append(ba, byte(b))
}
progress,_ = strconv.atoi(string(ba))
}
return progress
}
从redis中取出来的数据是[]uint8类型数据,先断言,然后转类型返回。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
推荐阅读
