sqlstore.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. package sqlstore
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/grafana/grafana/pkg/bus"
  13. "github.com/grafana/grafana/pkg/log"
  14. m "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/registry"
  16. "github.com/grafana/grafana/pkg/services/annotations"
  17. "github.com/grafana/grafana/pkg/services/cache"
  18. "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
  19. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
  20. "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
  21. "github.com/grafana/grafana/pkg/setting"
  22. "github.com/grafana/grafana/pkg/util"
  23. "github.com/go-sql-driver/mysql"
  24. "github.com/go-xorm/xorm"
  25. _ "github.com/grafana/grafana/pkg/tsdb/mssql"
  26. _ "github.com/lib/pq"
  27. sqlite3 "github.com/mattn/go-sqlite3"
  28. )
  29. var (
  30. x *xorm.Engine
  31. dialect migrator.Dialect
  32. sqlog log.Logger = log.New("sqlstore")
  33. )
  34. const ContextSessionName = "db-session"
  35. func init() {
  36. registry.Register(&registry.Descriptor{
  37. Name: "SqlStore",
  38. Instance: &SqlStore{},
  39. InitPriority: registry.High,
  40. })
  41. }
  42. type SqlStore struct {
  43. Cfg *setting.Cfg `inject:""`
  44. Bus bus.Bus `inject:""`
  45. CacheService *cache.CacheService `inject:""`
  46. dbCfg DatabaseConfig
  47. engine *xorm.Engine
  48. log log.Logger
  49. Dialect migrator.Dialect
  50. skipEnsureAdmin bool
  51. }
  52. // NewSession returns a new DBSession
  53. func (ss *SqlStore) NewSession() *DBSession {
  54. return &DBSession{Session: ss.engine.NewSession()}
  55. }
  56. // WithDbSession calls the callback with an session attached to the context.
  57. func (ss *SqlStore) WithDbSession(ctx context.Context, callback dbTransactionFunc) error {
  58. sess, err := startSession(ctx, ss.engine, false)
  59. if err != nil {
  60. return err
  61. }
  62. return callback(sess)
  63. }
  64. // WithTransactionalDbSession calls the callback with an session within a transaction
  65. func (ss *SqlStore) WithTransactionalDbSession(ctx context.Context, callback dbTransactionFunc) error {
  66. return ss.inTransactionWithRetryCtx(ctx, callback, 0)
  67. }
  68. func (ss *SqlStore) inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
  69. sess, err := startSession(ctx, ss.engine, true)
  70. if err != nil {
  71. return err
  72. }
  73. defer sess.Close()
  74. err = callback(sess)
  75. // special handling of database locked errors for sqlite, then we can retry 3 times
  76. if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
  77. if sqlError.Code == sqlite3.ErrLocked {
  78. sess.Rollback()
  79. time.Sleep(time.Millisecond * time.Duration(10))
  80. sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
  81. return ss.inTransactionWithRetryCtx(ctx, callback, retry+1)
  82. }
  83. }
  84. if err != nil {
  85. sess.Rollback()
  86. return err
  87. } else if err = sess.Commit(); err != nil {
  88. return err
  89. }
  90. if len(sess.events) > 0 {
  91. for _, e := range sess.events {
  92. if err = bus.Publish(e); err != nil {
  93. log.Error(3, "Failed to publish event after commit. error: %v", err)
  94. }
  95. }
  96. }
  97. return nil
  98. }
  99. func (ss *SqlStore) Init() error {
  100. ss.log = log.New("sqlstore")
  101. ss.readConfig()
  102. engine, err := ss.getEngine()
  103. if err != nil {
  104. return fmt.Errorf("Fail to connect to database: %v", err)
  105. }
  106. ss.engine = engine
  107. ss.Dialect = migrator.NewDialect(ss.engine)
  108. // temporarily still set global var
  109. x = engine
  110. dialect = ss.Dialect
  111. migrator := migrator.NewMigrator(x)
  112. migrations.AddMigrations(migrator)
  113. for _, descriptor := range registry.GetServices() {
  114. sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
  115. if ok {
  116. sc.AddMigration(migrator)
  117. }
  118. }
  119. if err := migrator.Start(); err != nil {
  120. return fmt.Errorf("Migration failed err: %v", err)
  121. }
  122. // Init repo instances
  123. annotations.SetRepository(&SqlAnnotationRepo{})
  124. ss.Bus.SetTransactionManager(ss)
  125. // Register handlers
  126. ss.addUserQueryAndCommandHandlers()
  127. // ensure admin user
  128. if ss.skipEnsureAdmin {
  129. return nil
  130. }
  131. return ss.ensureAdminUser()
  132. }
  133. func (ss *SqlStore) ensureAdminUser() error {
  134. systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
  135. err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
  136. err := bus.DispatchCtx(ctx, &systemUserCountQuery)
  137. if err != nil {
  138. return fmt.Errorf("Could not determine if admin user exists: %v", err)
  139. }
  140. if systemUserCountQuery.Result.Count > 0 {
  141. return nil
  142. }
  143. cmd := m.CreateUserCommand{}
  144. cmd.Login = setting.AdminUser
  145. cmd.Email = setting.AdminUser + "@localhost"
  146. cmd.Password = setting.AdminPassword
  147. cmd.IsAdmin = true
  148. if err := bus.DispatchCtx(ctx, &cmd); err != nil {
  149. return fmt.Errorf("Failed to create admin user: %v", err)
  150. }
  151. ss.log.Info("Created default admin", "user", setting.AdminUser)
  152. return nil
  153. })
  154. return err
  155. }
  156. func (ss *SqlStore) buildExtraConnectionString(sep rune) string {
  157. if ss.dbCfg.UrlQueryParams == nil {
  158. return ""
  159. }
  160. var sb strings.Builder
  161. for key, values := range ss.dbCfg.UrlQueryParams {
  162. for _, value := range values {
  163. sb.WriteRune(sep)
  164. sb.WriteString(key)
  165. sb.WriteRune('=')
  166. sb.WriteString(value)
  167. }
  168. }
  169. return sb.String()
  170. }
  171. func (ss *SqlStore) buildConnectionString() (string, error) {
  172. cnnstr := ss.dbCfg.ConnectionString
  173. // special case used by integration tests
  174. if cnnstr != "" {
  175. return cnnstr, nil
  176. }
  177. switch ss.dbCfg.Type {
  178. case migrator.MYSQL:
  179. protocol := "tcp"
  180. if strings.HasPrefix(ss.dbCfg.Host, "/") {
  181. protocol = "unix"
  182. }
  183. cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
  184. ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
  185. if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
  186. tlsCert, err := makeCert("custom", ss.dbCfg)
  187. if err != nil {
  188. return "", err
  189. }
  190. mysql.RegisterTLSConfig("custom", tlsCert)
  191. cnnstr += "&tls=custom"
  192. }
  193. cnnstr += ss.buildExtraConnectionString('&')
  194. case migrator.POSTGRES:
  195. host, port, err := util.SplitIPPort(ss.dbCfg.Host, "5432")
  196. if err != nil {
  197. return "", err
  198. }
  199. if ss.dbCfg.Pwd == "" {
  200. ss.dbCfg.Pwd = "''"
  201. }
  202. if ss.dbCfg.User == "" {
  203. ss.dbCfg.User = "''"
  204. }
  205. cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
  206. cnnstr += ss.buildExtraConnectionString(' ')
  207. case migrator.SQLITE:
  208. // special case for tests
  209. if !filepath.IsAbs(ss.dbCfg.Path) {
  210. ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
  211. }
  212. os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
  213. cnnstr = fmt.Sprintf("file:%s?cache=%s&mode=rwc", ss.dbCfg.Path, ss.dbCfg.CacheMode)
  214. cnnstr += ss.buildExtraConnectionString('&')
  215. default:
  216. return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
  217. }
  218. return cnnstr, nil
  219. }
  220. func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
  221. connectionString, err := ss.buildConnectionString()
  222. if err != nil {
  223. return nil, err
  224. }
  225. sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
  226. engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
  227. if err != nil {
  228. return nil, err
  229. }
  230. engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
  231. engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
  232. engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
  233. // configure sql logging
  234. debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
  235. if !debugSql {
  236. engine.SetLogger(&xorm.DiscardLogger{})
  237. } else {
  238. engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
  239. engine.ShowSQL(true)
  240. engine.ShowExecTime(true)
  241. }
  242. return engine, nil
  243. }
  244. func (ss *SqlStore) readConfig() {
  245. sec := ss.Cfg.Raw.Section("database")
  246. cfgURL := sec.Key("url").String()
  247. if len(cfgURL) != 0 {
  248. dbURL, _ := url.Parse(cfgURL)
  249. ss.dbCfg.Type = dbURL.Scheme
  250. ss.dbCfg.Host = dbURL.Host
  251. pathSplit := strings.Split(dbURL.Path, "/")
  252. if len(pathSplit) > 1 {
  253. ss.dbCfg.Name = pathSplit[1]
  254. }
  255. userInfo := dbURL.User
  256. if userInfo != nil {
  257. ss.dbCfg.User = userInfo.Username()
  258. ss.dbCfg.Pwd, _ = userInfo.Password()
  259. }
  260. ss.dbCfg.UrlQueryParams = dbURL.Query()
  261. } else {
  262. ss.dbCfg.Type = sec.Key("type").String()
  263. ss.dbCfg.Host = sec.Key("host").String()
  264. ss.dbCfg.Name = sec.Key("name").String()
  265. ss.dbCfg.User = sec.Key("user").String()
  266. ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
  267. ss.dbCfg.Pwd = sec.Key("password").String()
  268. }
  269. ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
  270. ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
  271. ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
  272. ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
  273. ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
  274. ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
  275. ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
  276. ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
  277. ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
  278. ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
  279. }
  280. func InitTestDB(t *testing.T) *SqlStore {
  281. t.Helper()
  282. sqlstore := &SqlStore{}
  283. sqlstore.skipEnsureAdmin = true
  284. sqlstore.Bus = bus.New()
  285. sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
  286. dbType := migrator.SQLITE
  287. // environment variable present for test db?
  288. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  289. dbType = db
  290. }
  291. // set test db config
  292. sqlstore.Cfg = setting.NewCfg()
  293. sec, _ := sqlstore.Cfg.Raw.NewSection("database")
  294. sec.NewKey("type", dbType)
  295. switch dbType {
  296. case "mysql":
  297. sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
  298. case "postgres":
  299. sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
  300. default:
  301. sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
  302. }
  303. // need to get engine to clean db before we init
  304. engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
  305. if err != nil {
  306. t.Fatalf("Failed to init test database: %v", err)
  307. }
  308. sqlstore.Dialect = migrator.NewDialect(engine)
  309. // temp global var until we get rid of global vars
  310. dialect = sqlstore.Dialect
  311. if err := dialect.CleanDB(); err != nil {
  312. t.Fatalf("Failed to clean test db %v", err)
  313. }
  314. if err := sqlstore.Init(); err != nil {
  315. t.Fatalf("Failed to init test database: %v", err)
  316. }
  317. sqlstore.engine.DatabaseTZ = time.UTC
  318. sqlstore.engine.TZLocation = time.UTC
  319. return sqlstore
  320. }
  321. func IsTestDbMySql() bool {
  322. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  323. return db == migrator.MYSQL
  324. }
  325. return false
  326. }
  327. func IsTestDbPostgres() bool {
  328. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  329. return db == migrator.POSTGRES
  330. }
  331. return false
  332. }
  333. type DatabaseConfig struct {
  334. Type string
  335. Host string
  336. Name string
  337. User string
  338. Pwd string
  339. Path string
  340. SslMode string
  341. CaCertPath string
  342. ClientKeyPath string
  343. ClientCertPath string
  344. ServerCertName string
  345. ConnectionString string
  346. MaxOpenConn int
  347. MaxIdleConn int
  348. ConnMaxLifetime int
  349. CacheMode string
  350. UrlQueryParams map[string][]string
  351. }