Go实现文件分片上传

2022-07-25 17:42:55

Go语言在写HTTP服务程序时,会经常用到文件上传和文件下载,文件上传和文件下载都可以用http包,默认的功能基本上够用了。http包支持文件下载的断点续传和进度显示,文件上传貌似不支持断点续传,不知...

Go语言在写HTTP服务程序时,会经常用到文件上传和文件下载,文件上传和文件下载都可以用http包,默认的功能基本上够用了。http包支持文件下载的断点续传和进度显示,文件上传貌似不支持断点续传,不知道是不是要web端来实现。

然后我自己配合web端实现了大文件分片上传,来完成断点续传和进度显示的功能。

基本思想是,在web端将上传的文件进行分片处理,然后向服务端发送上传请求(UploadRequest)包括文件名,MD5,文件大小,和文件总片数。
然后开始一片一片的上传(Upload)。如果暂停了或者断网失败了,就记录当前上传片的片数,下一次上传,重发UploadRequest,chunkPos为续传位置片数,然后从这个记录的片数开始接着上传(Upload)。在服务端,记录首次请求的文件名,MD5,文件大小,文件总片数,然后开始接收每一片,将每一个片保存为一个单独的文件,在接收完最后一个片时,将所有的分片份文件进行合并。保存为对应的文件名。最后再校验MD5码。

对于不同用户的请求,将文件上传的信息存储到session中,当前文件上传暂停后,上传其它文件时,向session增加新文件的信息。暂停后,接着上传最开始的文件,则直接从session中取得该文件的信息,包括文件名,上传到某一个片的信息等。

服务端实现代码:

func UploadRequest(w http.ResponseWriter, r *http.Request) {
 log.WithFields(log.Fields{
  "HTTP": r.Method,
  "FUNC": "UploadRequest",
 }).Info("HTTP REQUEST")

 header := w.Header()
 header.Add("Content-Type", "application/json")

 /* session authentication */
 sess := session.GlobalSessions.SessionCheck(w, r)
 if sess == nil {
  log.Error("sess check error")
  fmt.Fprintf(w, config.FmtStr, "3000", "error", "session check error")
  return
 }

 defer r.Body.Close()
 con, _ := ioutil.ReadAll(r.Body) //获取post的body数据

 log.Info("UploadRequest json: ", string(con))

 var uploadReq uploadRequest
 err := json.Unmarshal([]byte(con), &uploadReq) /* 解析json字符串数据到结构体中 */
 if err != nil {
  log.Error("json unmarshal error")
  fmt.Fprintf(w, config.FmtStr, "2000", "error", "json unmarshal error")
  return
 }

 switch uploadReq.Option {
 case "reUploadFile":
  {
   sess.Set("currentFile", uploadReq)
   err := os.Remove("./tmp/" + uploadReq.FileName + "/" + uploadReq.FileName + "_" + uploadReq.ChunkPos)
   if err != nil {
    log.Error(err)
   }
   fmt.Fprintf(w, config.FmtStr, "1000", "success", "reupload request success")
  }
 case "uploadFile":
  {
   err = os.Mkdir("./tmp/"+uploadReq.FileName, 0777)
   if err != nil {
    log.Info(err)
    fmt.Fprintf(w, config.FmtStr, "3000", "error", "mkdir error")
   } else {
    log.Infof("create dir %s ok\r\n", uploadReq.FileName)
   }
   // 文件上传信息保存,保存到session中,用于分片续传时使用
   sess.Set(uploadReq.FileName, uploadReq)
   sess.Set("currentFile", uploadReq)

   fmt.Fprintf(w, config.FmtStr, "1000", "success", "upload request success")
  }
 case "uploadCancel":
  {
   err = os.RemoveAll("./tmp/" + uploadReq.FileName)
   if err != nil {
    log.Fatal(err)
   }

   sess.Delete(uploadReq.FileName)
   fmt.Fprintf(w, config.FmtStr, "1000", "success", "upload file success")
   return
  }
 default:
  {
   fmt.Fprintf(w, config.FmtStr, "2000", "error", "upload request option error")
  }
 }

}

/* 分片上传功能实现 */
func Upload(w http.ResponseWriter, r *http.Request) {
 log.WithFields(log.Fields{
  "HTTP": r.Method,
  "FUNC": "Upload",
 }).Info("HTTP REQUEST")

 header := w.Header()
 header.Add("Content-Type", "application/json")

 /* session authentication */
 sess := globalSessions.SessionCheck(w, r) //session 检查的接口封装
 if sess == nil {
  log.Error("session check failed")
  fmt.Fprintf(w, config.FmtStr, "3000", "error", "session check failed")
  return
 }
 current_dir := sess.Get("current_dir")
 log.Info("current dir: ", current_dir)
 up_dir := sess.Get("up_dir")
 log.Info("up_dir: ", up_dir)

 // var current_dir string = "./river/test"
 /* 表单上传文件 */
 if r.Method == "GET" {
  crutime := time.Now().Unix()
  h := md5.New()
  io.WriteString(h, strconv.FormatInt(crutime, 10))
  token := fmt.Sprintf("%x", h.Sum(nil))

  t, _ := template.ParseFiles("upload.gtpl")
  t.Execute(w, token)
 } else {
  r.ParseMultipartForm(32 << 20)
  file, _, err := r.FormFile("uploadfile")
  if err != nil {
   log.Error(err)
   return
  }
  defer file.Close()

  var uploadFileInfo uploadRequest
  uploadFileInfo = sess.Get("currentFile").(uploadRequest)

  // fmt.Fprintf(w, "%v", sess.Get("fileName"))
  var fileName string = uploadFileInfo.FileName + "_" + uploadFileInfo.ChunkPos

  /* 判断上传的文件是否已经存在,可能发生同名的情况 */
  log.Debug("fileName: ", fileName)
  _, err = os.Stat("./tmp/" + uploadFileInfo.FileName + "/" + fileName)
  var f *os.File
  if err == nil {
   fmt.Fprintf(w, config.FmtStr, "3000", "error", "file is exsited")
   return
  } else {
   f, err = os.OpenFile("./tmp/"+uploadFileInfo.FileName+"/"+fileName, os.O_WRONLY|os.O_CREATE, 0666) // 此处假设当前目录下已存在test目录
  }

  defer f.Close()

  if err != nil {
   log.Error(err)
   fmt.Fprintf(w, config.FmtStr, "3000", "error", "create file error")
   return
  }
  io.Copy(f, file) //这里进行大文件copy会导致内存占用过大。一段时候后会进行GC回收
  log.Info("copy ok")

  // 判断是否单个文件是否上传完毕,上传完毕则合并文件到目标文件夹
  if uploadFileInfo.ChunkPos == uploadFileInfo.ChunkNum {
   f.Close()
   _, err = os.Stat(current_dir.(string) + uploadFileInfo.FileName)
   var fii *os.File
   if err == nil {
    // 如果已经存在这个文件了,则在当前文件名后增加日期
    time := time.Now().Format("2006-01-02_15-04-05")
    log.Debug("Time: ", time)

    var fileName []byte = []byte(currjsent_dir.(string) + uploadFileInfo.FileName)
    var fileNameFinal []byte = fileName
    log.Debug("file_name: ", string(fileName))
    var preFileName []byte
    var sufFileName []byte
    for i := len(fileName) - 1; i > 0; i-- {
     if fileName[i] == '.' {
      preFileName = fileName[:i] // 文件名前缀 test
      sufFileName = fileName[i:] // 文件名后缀 .txt
      fileNameFinal = []byte(string(preFileName) + "_" + time + string(sufFileName))
      log.Info("fileNameFile: ", fileNameFinal)
      break
     }
    }

    fii, err = os.OpenFile(string(fileNameFinal), os.O_WRONLY|os.O_CREATE, 0777) // 此处假设当前目录下已存在test目录
   } else {
    fii, err = os.OppythonenFile(current_dir.(string)+uploadFileInfo.FileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, os.ModePerm)
   }
   defer fii.Close()

   if err != nil {
    log.Error(err)
    fmt.Fprintf(w, config.FmtStr, "3000", "error", "Open object file error")
    return
   }
   index, _ := strconv.Atoi(uploadFileInfo.ChunkNum)
   for i := 1; i <= index; i++ {
    f11, err := os.OpenFile("./tmp/"+uploadFileInfo.FileName+"/"+uploadFileInfo.FileName+"_"+strconv.Itoa(int(i)), os.O_RDONLY, os.ModePerm)
    if err != nil {
     log.Error(err)
     fmt.Fprintf(w, config.FmtStr, "3000", "error", "Open slice file error")
     return
    }
    b, err := ioutil.ReadAll(f11)
    if err != nil {
     log.Error(err)
     fmt.Fprintf(w, config.FmtStr, "3000", "error", "ioutil readall error")
     return
    }
    fii.Write(b)
    f11.Close()
   }

   err = os.RemoveAll("./tmp/" + uploadFileInfo.FileName)
   if err != nil {
    log.Error(err)
   }

   sess.Delete(uploadFileInfo.FileName)

   fii.Close()
   if uploadFileInfo.MD5 == FileMD5(fii.Name()) {
    err := os.Remove(current_dir.(string) + fii.Name())
    if err != nil {
     log.Error(err)
    }
    fmt.Fprintf(w, config.FmtStr, "3000", "error", "upload file md5 error")
   } else {
    fmt.Fprintf(w, config.FmtStr, "1000", "success", "upload one file all slice success")
   }
   return
  }

  chunkPos, err := strconv.Atoi(uploadFileInfo.ChunkPos)
  if err != nil {
   log.Error(err)
  }
  uploadFileInfo.ChunkPos = strconv.Itoa(chunkPos + 1)
  sess.Set(uploadFileInfo.FileName, uploadFileInfo)
  sess.Set("currentF编程ile", uploadFileInfo)

  fmt.Fprintf(w, config.FmtStr, "1000", "success", "upload file success")
 }
}

func FileMD5(file string) string {
 f, err := os.Open(file)
 defer f.Close()
 if err != nil {
  log.Info(err)
  return ""
 } buffer, _ := ioutil.ReadAll(f)
 data := buffer
 has := md5.Sum(data)
 md5str := fmt.Sprintf("%x", has)
 return md5str
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。