Skip to content

Commit c9a2d2f

Browse files
committed
feat(transcode): lock the destination transcode cache path
1 parent 7eaf602 commit c9a2d2f

File tree

2 files changed

+60
-2
lines changed

2 files changed

+60
-2
lines changed

transcode/transcode_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package transcode_test
22

33
import (
44
"bytes"
5+
"context"
56
"io"
67
"net/http"
78
"net/http/httptest"
89
"os"
910
"os/exec"
11+
"sync"
12+
"sync/atomic"
1013
"testing"
1114
"time"
1215

@@ -100,3 +103,41 @@ func TestTranscodeWithSeek(t *testing.T) {
100103
// since we seeked 2 seconds, we should have 5-2 = 3 seconds of PCM data
101104
require.Equal(t, (testFileLen-seekSecs)*bytesPerSec, buf.Len())
102105
}
106+
107+
func TestCachingParallelism(t *testing.T) {
108+
t.Parallel()
109+
110+
var realTranscodeCount atomic.Uint64
111+
transcoder := callbackTranscoder{
112+
transcoder: transcode.NewFFmpegTranscoder(),
113+
callback: func() { realTranscodeCount.Add(1) },
114+
}
115+
116+
cacheTranscoder := transcode.NewCachingTranscoder(transcoder, t.TempDir())
117+
118+
var wg sync.WaitGroup
119+
for i := 0; i < 5; i++ {
120+
wg.Add(1)
121+
go func() {
122+
defer wg.Done()
123+
124+
var buf bytes.Buffer
125+
require.NoError(t, cacheTranscoder.Transcode(context.Background(), transcode.PCM16le, "testdata/5s.flac", &buf))
126+
require.Equal(t, 5*bytesPerSec, buf.Len())
127+
}()
128+
}
129+
130+
wg.Wait()
131+
132+
require.Equal(t, 1, int(realTranscodeCount.Load()))
133+
}
134+
135+
type callbackTranscoder struct {
136+
transcoder transcode.Transcoder
137+
callback func()
138+
}
139+
140+
func (ct callbackTranscoder) Transcode(ctx context.Context, profile transcode.Profile, in string, out io.Writer) error {
141+
ct.callback()
142+
return ct.transcoder.Transcode(ctx, profile, in, out)
143+
}

transcode/transcoder_caching.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ import (
77
"io"
88
"os"
99
"path/filepath"
10+
"sync"
1011
)
1112

1213
const perm = 0o644
1314

1415
type CachingTranscoder struct {
1516
cachePath string
1617
transcoder Transcoder
18+
locks keyedMutex
1719
}
1820

1921
var _ Transcoder = (*CachingTranscoder)(nil)
@@ -38,8 +40,10 @@ func (t *CachingTranscoder) Transcode(ctx context.Context, profile Profile, in s
3840
}
3941

4042
key := cacheKey(name, args)
41-
path := filepath.Join(t.cachePath, key)
43+
unlock := t.locks.Lock(key)
44+
defer unlock()
4245

46+
path := filepath.Join(t.cachePath, key)
4347
cf, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o644)
4448
if err != nil {
4549
return fmt.Errorf("open cache file: %w", err)
@@ -51,7 +55,8 @@ func (t *CachingTranscoder) Transcode(ctx context.Context, profile Profile, in s
5155
return nil
5256
}
5357

54-
if err := t.transcoder.Transcode(ctx, profile, in, io.MultiWriter(out, cf)); err != nil {
58+
dest := io.MultiWriter(out, cf)
59+
if err := t.transcoder.Transcode(ctx, profile, in, dest); err != nil {
5560
os.Remove(path)
5661
return fmt.Errorf("internal transcode: %w", err)
5762
}
@@ -69,3 +74,15 @@ func cacheKey(cmd string, args []string) string {
6974
}
7075
return fmt.Sprintf("%x", sum.Sum(nil))
7176
}
77+
78+
type keyedMutex struct {
79+
sync.Map
80+
}
81+
82+
func (km *keyedMutex) Lock(key string) func() {
83+
value, _ := km.LoadOrStore(key, &sync.Mutex{})
84+
mu := value.(*sync.Mutex)
85+
mu.Lock()
86+
// TODO: remove key entry from map to save some space?
87+
return mu.Unlock
88+
}

0 commit comments

Comments
 (0)