gouda/upsched/upsched.go
2024-12-07 23:54:44 +01:00

134 lines
3.7 KiB
Go

// Package upsched provides utilities for managing and scheduling multi-part
// uploads with a timeout mechanism. It allows appending chunks of data to
// a destination and automatically finalizes the upload if a specified
// timeout is reached.
package upsched
import (
"errors"
"fmt"
"io"
"mime/multipart"
"os"
"time"
"github.com/alphadose/haxmap"
"golang.org/x/exp/constraints"
)
const (
// AppendOpenFlags is the recommended flag set for opening a file to
// which chunks will be appended during the upload process.
AppendOpenFlags = os.O_APPEND | os.O_CREATE | os.O_WRONLY
)
// Key defines the set of types that can be used as keys in the Scheduler.
// It can be any integer or string type.
type Key interface {
constraints.Integer | ~string
}
// Scheduler manages the scheduling of multi-part uploads. It maintains
// a map of active uploads and handles the appending of chunks, as well as
// the automatic finalization of uploads based on a timeout.
type Scheduler[K Key] interface {
Prepare(k K, timeout time.Duration, cb func(K, error)) error
Append(k K, chunk multipart.File, dst io.Writer) error
Finish(k K) error
}
// upload holds the state for a single upload, including its timeout
// duration and an associated timer.
type upload struct {
timeout time.Duration
timer *time.Timer
}
// scheduler implements the Scheduler interface.
type scheduler[K Key] struct {
m *haxmap.Map[K, upload]
}
// NewScheduler creates a new Scheduler. It returns a Scheduler configured to
// manage uploads keyed by the specified type.
func NewScheduler[K Key]() Scheduler[K] {
return scheduler[K]{
m: haxmap.New[K, upload](),
}
}
// Prepare initializes an upload with the given key and timeout duration.
// If an upload with the specified key already exists, an error is returned.
// The provided callback function is called with the key and an error if the
// upload times out before it is finished. This function should be called
// before any chunks are appended.
//
// If the upload is successfully initialized, a timer is started based on the
// provided timeout duration. If the timer expires before the upload is
// finished, the callback function is invoked.
//
// Returns an error if the key already exists in the scheduler.
func (us scheduler[K]) Prepare(k K, timeout time.Duration, cb func(K, error)) error {
_, ok := us.m.Get(k)
if ok {
return errors.New("upload key already exists")
}
d := time.Second * time.Duration(timeout)
f := func() {
if _, ok := us.m.Get(k); ok {
err := us.Finish(k)
cb(k, err)
}
}
us.m.Set(
k,
upload{
timeout: timeout,
timer: time.AfterFunc(d, f),
},
)
return nil
}
// Append appends a chunk of data to the destination writer associated with
// the given key. It resets the upload's timer to the initial timeout duration
// upon a successful append. If the key does not exist, an error is returned.
//
// It is recommended to use AppendOpenFlags for actual files that are passed
// to this function.
func (us scheduler[K]) Append(k K, chunk multipart.File, dst io.Writer) error {
u, ok := us.m.Get(k)
if !ok {
return errors.New("upload key does not exist")
}
u.timer.Stop()
defer u.timer.Reset(u.timeout)
_, err := io.Copy(dst, chunk)
if err != nil {
return fmt.Errorf("unable to append chunk to destination file: %w", err)
}
return nil
}
// Finish finalizes the upload associated with the given key. It stops the
// associated timer and removes the upload from the scheduler's internal map.
// If the key does not exist, an error is returned.
func (us scheduler[K]) Finish(k K) error {
u, ok := us.m.Get(k)
if !ok {
return errors.New("upload key does not exist")
}
u.timer.Stop()
us.m.Del(k)
return nil
}